source: trunk/libtransmission/peer-mgr.c @ 7154

Last change on this file since 7154 was 7154, checked in by charles, 12 years ago

(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

  • Property svn:keywords set to Date Rev Author Id
File size: 65.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7154 2008-11-25 21:35:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 200,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92/* We keep one of these for every peer we know about, whether
93 * it's connected or not, so the struct must be small.
94 * When our current connections underperform, we dip back
95 * into this list for new ones. */
96struct peer_atom
97{
98    uint8_t           from;
99    uint8_t           flags; /* these match the added_f flags */
100    uint8_t           myflags; /* flags that aren't defined in added_f */
101    uint16_t          port;
102    uint16_t          numFails;
103    struct in_addr    addr;
104    time_t            time; /* when the peer's connection status last changed */
105    time_t            piece_data_time;
106};
107
108typedef struct
109{
110    unsigned int    isRunning : 1;
111
112    uint8_t         hash[SHA_DIGEST_LENGTH];
113    int         *   pendingRequestCount;
114    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
115    tr_ptrArray *   pool; /* struct peer_atom */
116    tr_ptrArray *   peers; /* tr_peer */
117    tr_ptrArray *   webseeds; /* tr_webseed */
118    tr_timer *      reconnectTimer;
119    tr_timer *      rechokeTimer;
120    tr_timer *      refillTimer;
121    tr_torrent *    tor;
122    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
123
124    struct tr_peerMgr * manager;
125}
126Torrent;
127
128struct tr_peerMgr
129{
130    tr_session      * session;
131    tr_ptrArray     * torrents; /* Torrent */
132    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
133    tr_timer        * bandwidthTimer;
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187compareAddresses( const struct in_addr * a,
188                  const struct in_addr * b )
189{
190    if( a->s_addr != b->s_addr )
191        return a->s_addr < b->s_addr ? -1 : 1;
192
193    return 0;
194}
195
196static int
197handshakeCompareToAddr( const void * va,
198                        const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a,
207                  const void * b )
208{
209    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
210}
211
212static tr_handshake*
213getExistingHandshake( tr_ptrArray *          handshakes,
214                      const struct in_addr * in_addr )
215{
216    return tr_ptrArrayFindSorted( handshakes,
217                                  in_addr,
218                                  handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va,
223                          const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va,
232                  const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va,
274             const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return compareAddresses( &a->in_addr, &b->in_addr );
280}
281
282static int
283peerCompareToAddr( const void * va,
284                   const void * vb )
285{
286    const tr_peer * a = va;
287
288    return compareAddresses( &a->in_addr, vb );
289}
290
291static tr_peer*
292getExistingPeer( Torrent *              torrent,
293                 const struct in_addr * in_addr )
294{
295    assert( torrentIsLocked( torrent ) );
296    assert( in_addr );
297
298    return tr_ptrArrayFindSorted( torrent->peers,
299                                  in_addr,
300                                  peerCompareToAddr );
301}
302
303static struct peer_atom*
304getExistingAtom( const                  Torrent * t,
305                 const struct in_addr * addr )
306{
307    assert( torrentIsLocked( t ) );
308    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
309}
310
311static int
312peerIsInUse( const Torrent *        ct,
313             const struct in_addr * addr )
314{
315    Torrent * t = (Torrent*) ct;
316
317    assert( torrentIsLocked ( t ) );
318
319    return getExistingPeer( t, addr )
320           || getExistingHandshake( t->outgoingHandshakes, addr )
321           || getExistingHandshake( t->manager->incomingHandshakes, addr );
322}
323
324static tr_peer*
325peerConstructor( tr_torrent * tor, const struct in_addr * in_addr )
326{
327    tr_peer * p;
328
329    p = tr_new0( tr_peer, 1 );
330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
331    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
332    return p;
333}
334
335static tr_peer*
336getPeer( Torrent *              torrent,
337         const struct in_addr * in_addr )
338{
339    tr_peer * peer;
340
341    assert( torrentIsLocked( torrent ) );
342
343    peer = getExistingPeer( torrent, in_addr );
344
345    if( peer == NULL )
346    {
347        peer = peerConstructor( torrent->tor, in_addr );
348        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
349    }
350
351    return peer;
352}
353
354static void
355peerDestructor( tr_peer * peer )
356{
357    assert( peer );
358    assert( peer->msgs );
359
360    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
361    tr_peerMsgsFree( peer->msgs );
362
363    tr_peerIoFree( peer->io );
364
365    tr_bitfieldFree( peer->have );
366    tr_bitfieldFree( peer->blame );
367    tr_free( peer->client );
368
369    tr_bandwidthFree( peer->bandwidth );
370
371    tr_free( peer );
372}
373
374static void
375removePeer( Torrent * t,
376            tr_peer * peer )
377{
378    tr_peer *          removed;
379    struct peer_atom * atom;
380
381    assert( torrentIsLocked( t ) );
382
383    atom = getExistingAtom( t, &peer->in_addr );
384    assert( atom );
385    atom->time = time( NULL );
386
387    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
388    assert( removed == peer );
389    peerDestructor( removed );
390}
391
392static void
393removeAllPeers( Torrent * t )
394{
395    while( !tr_ptrArrayEmpty( t->peers ) )
396        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
397}
398
399static void
400torrentDestructor( void * vt )
401{
402    Torrent * t = vt;
403    uint8_t   hash[SHA_DIGEST_LENGTH];
404
405    assert( t );
406    assert( !t->isRunning );
407    assert( t->peers );
408    assert( torrentIsLocked( t ) );
409    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
410    assert( tr_ptrArrayEmpty( t->peers ) );
411
412    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
413
414    tr_timerFree( &t->reconnectTimer );
415    tr_timerFree( &t->rechokeTimer );
416    tr_timerFree( &t->refillTimer );
417
418    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
419    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
420    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
421    tr_ptrArrayFree( t->peers, NULL );
422
423    tr_free( t->pendingRequestCount );
424    tr_free( t );
425}
426
427static void peerCallbackFunc( void * vpeer,
428                              void * vevent,
429                              void * vt );
430
431static Torrent*
432torrentConstructor( tr_peerMgr * manager,
433                    tr_torrent * tor )
434{
435    int       i;
436    Torrent * t;
437
438    t = tr_new0( Torrent, 1 );
439    t->manager = manager;
440    t->tor = tor;
441    t->pool = tr_ptrArrayNew( );
442    t->peers = tr_ptrArrayNew( );
443    t->webseeds = tr_ptrArrayNew( );
444    t->outgoingHandshakes = tr_ptrArrayNew( );
445    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
446
447    for( i = 0; i < tor->info.webseedCount; ++i )
448    {
449        tr_webseed * w =
450            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
451                           t );
452        tr_ptrArrayAppend( t->webseeds, w );
453    }
454
455    return t;
456}
457
458/**
459 * For explanation, see http://www.bittorrent.org/fast_extensions.html
460 * Also see the "test-allowed-set" unit test
461 *
462 * @param k number of pieces in set
463 * @param sz number of pieces in the torrent
464 * @param infohash torrent's SHA1 hash
465 * @param ip peer's address
466 */
467struct tr_bitfield *
468tr_peerMgrGenerateAllowedSet(
469    const uint32_t         k,
470    const uint32_t         sz,
471    const                  uint8_t        *
472                           infohash,
473    const struct in_addr * ip )
474{
475    uint8_t       w[SHA_DIGEST_LENGTH + 4];
476    uint8_t       x[SHA_DIGEST_LENGTH];
477    tr_bitfield * a;
478    uint32_t      a_size;
479
480    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
481    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
482    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
483
484    a = tr_bitfieldNew( sz );
485    a_size = 0;
486
487    while( a_size < k )
488    {
489        int i;
490        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
491        {
492            uint32_t j = i * 4;                                /* (5) */
493            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
494            uint32_t index = y % sz;                           /* (7) */
495            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
496            {
497                tr_bitfieldAdd( a, index );                    /* (9) */
498                ++a_size;
499            }
500        }
501        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
502    }
503
504    return a;
505}
506
507static int bandwidthPulse( void * vmgr );
508
509
510tr_peerMgr*
511tr_peerMgrNew( tr_session * session )
512{
513    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
514
515    m->session = session;
516    m->torrents = tr_ptrArrayNew( );
517    m->incomingHandshakes = tr_ptrArrayNew( );
518    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
519    return m;
520}
521
522void
523tr_peerMgrFree( tr_peerMgr * manager )
524{
525    managerLock( manager );
526
527    tr_timerFree( &manager->bandwidthTimer );
528
529    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
530     * the item from manager->handshakes, so this is a little roundabout... */
531    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
532        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
533
534    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
535
536    /* free the torrents. */
537    tr_ptrArrayFree( manager->torrents, torrentDestructor );
538
539    managerUnlock( manager );
540    tr_free( manager );
541}
542
543static tr_peer**
544getConnectedPeers( Torrent * t,
545                   int *     setmeCount )
546{
547    int       i, peerCount, connectionCount;
548    tr_peer **peers;
549    tr_peer **ret;
550
551    assert( torrentIsLocked( t ) );
552
553    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
554    ret = tr_new( tr_peer *, peerCount );
555
556    for( i = connectionCount = 0; i < peerCount; ++i )
557        if( peers[i]->msgs )
558            ret[connectionCount++] = peers[i];
559
560    *setmeCount = connectionCount;
561    return ret;
562}
563
564static int
565clientIsDownloadingFrom( const tr_peer * peer )
566{
567    return peer->clientIsInterested && !peer->clientIsChoked;
568}
569
570static int
571clientIsUploadingTo( const tr_peer * peer )
572{
573    return peer->peerIsInterested && !peer->peerIsChoked;
574}
575
576/***
577****
578***/
579
580int
581tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
582                      const uint8_t *        torrentHash,
583                      const struct in_addr * addr )
584{
585    int                      isSeed = FALSE;
586    const Torrent *          t = NULL;
587    const struct peer_atom * atom = NULL;
588
589    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
590    if( t )
591        atom = getExistingAtom( t, addr );
592    if( atom )
593        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
594
595    return isSeed;
596}
597
598/****
599*****
600*****  REFILL
601*****
602****/
603
604static void
605assertValidPiece( Torrent * t, tr_piece_index_t piece )
606{
607    assert( t );
608    assert( t->tor );
609    assert( piece < t->tor->info.pieceCount );
610}
611
612static int
613getPieceRequests( Torrent * t, tr_piece_index_t piece )
614{
615    assertValidPiece( t, piece );
616
617    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
618}
619
620static void
621incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
622{
623    assertValidPiece( t, piece );
624
625    if( t->pendingRequestCount == NULL )
626        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
627    t->pendingRequestCount[piece]++;
628}
629
630static void
631decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
632{
633    assertValidPiece( t, piece );
634
635    if( t->pendingRequestCount )
636        t->pendingRequestCount[piece]--;
637}
638
639struct tr_refill_piece
640{
641    tr_priority_t    priority;
642    uint32_t         piece;
643    uint32_t         peerCount;
644    int              random;
645    int              pendingRequestCount;
646    int              missingBlockCount;
647};
648
649static int
650compareRefillPiece( const void * aIn,
651                    const void * bIn )
652{
653    const struct tr_refill_piece * a = aIn;
654    const struct tr_refill_piece * b = bIn;
655
656    /* if one piece has a higher priority, it goes first */
657    if( a->priority != b->priority )
658        return a->priority > b->priority ? -1 : 1;
659
660    /* have a per-priority endgame */
661    if( a->pendingRequestCount != b->pendingRequestCount )
662        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
663
664    /* fewer missing pieces goes first */
665    if( a->missingBlockCount != b->missingBlockCount )
666        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
667
668    /* otherwise if one has fewer peers, it goes first */
669    if( a->peerCount != b->peerCount )
670        return a->peerCount < b->peerCount ? -1 : 1;
671
672    /* otherwise go with our random seed */
673    if( a->random != b->random )
674        return a->random < b->random ? -1 : 1;
675
676    return 0;
677}
678
679static tr_piece_index_t *
680getPreferredPieces( Torrent           * t,
681                    tr_piece_index_t  * pieceCount )
682{
683    const tr_torrent  * tor = t->tor;
684    const tr_info     * inf = &tor->info;
685    tr_piece_index_t    i;
686    tr_piece_index_t    poolSize = 0;
687    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
688    int                 peerCount;
689    tr_peer**           peers;
690
691    assert( torrentIsLocked( t ) );
692
693    peers = getConnectedPeers( t, &peerCount );
694
695    /* make a list of the pieces that we want but don't have */
696    for( i = 0; i < inf->pieceCount; ++i )
697        if( !tor->info.pieces[i].dnd
698                && !tr_cpPieceIsComplete( tor->completion, i ) )
699            pool[poolSize++] = i;
700
701    /* sort the pool by which to request next */
702    if( poolSize > 1 )
703    {
704        tr_piece_index_t j;
705        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
706
707        for( j = 0; j < poolSize; ++j )
708        {
709            int k;
710            const tr_piece_index_t piece = pool[j];
711            struct tr_refill_piece * setme = p + j;
712
713            setme->piece = piece;
714            setme->priority = inf->pieces[piece].priority;
715            setme->peerCount = 0;
716            setme->random = tr_cryptoWeakRandInt( INT_MAX );
717            setme->pendingRequestCount = getPieceRequests( t, piece );
718            setme->missingBlockCount
719                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
720
721            for( k = 0; k < peerCount; ++k )
722            {
723                const tr_peer * peer = peers[k];
724                if( peer->peerIsInterested
725                        && !peer->clientIsChoked
726                        && tr_bitfieldHas( peer->have, piece ) )
727                    ++setme->peerCount;
728            }
729        }
730
731        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
732               compareRefillPiece );
733
734        for( j = 0; j < poolSize; ++j )
735            pool[j] = p[j].piece;
736
737        tr_free( p );
738    }
739
740    tr_free( peers );
741
742    *pieceCount = poolSize;
743    return pool;
744}
745
746struct tr_blockIterator
747{
748    Torrent * t;
749    tr_block_index_t blockIndex, blockCount, *blocks;
750    tr_piece_index_t pieceIndex, pieceCount, *pieces;
751};
752
753static struct tr_blockIterator*
754blockIteratorNew( Torrent * t )
755{
756    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
757    i->t = t;
758    i->pieces = getPreferredPieces( t, &i->pieceCount );
759    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
760    return i;
761}
762
763static int
764blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
765{
766    int found;
767    Torrent * t = i->t;
768    tr_torrent * tor = t->tor;
769
770    while( ( i->blockIndex == i->blockCount )
771        && ( i->pieceIndex < i->pieceCount ) )
772    {
773        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
774        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
775        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
776        tr_block_index_t block;
777
778        assert( index < tor->info.pieceCount );
779
780        i->blockCount = 0;
781        i->blockIndex = 0;
782        for( block=b; block!=e; ++block )
783            if( !tr_cpBlockIsComplete( tor->completion, block ) )
784                i->blocks[i->blockCount++] = block;
785    }
786
787    if(( found = ( i->blockIndex < i->blockCount )))
788        *setme = i->blocks[i->blockIndex++];
789
790    return found;
791}
792
793static void
794blockIteratorFree( struct tr_blockIterator * i )
795{
796    tr_free( i->blocks );
797    tr_free( i->pieces );
798    tr_free( i );
799}
800
801static tr_peer**
802getPeersUploadingToClient( Torrent * t,
803                           int *     setmeCount )
804{
805    int j;
806    int peerCount = 0;
807    int retCount = 0;
808    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
809    tr_peer ** ret = tr_new( tr_peer *, peerCount );
810
811    j = 0; /* this is a temporary test to make sure we walk through all the peers */
812    if( peerCount )
813    {
814        /* Get a list of peers we're downloading from.
815           Pick a different starting point each time so all peers
816           get a chance at being the first in line */
817        const int fencepost = tr_cryptoWeakRandInt( peerCount );
818        int i = fencepost;
819        do {
820            if( clientIsDownloadingFrom( peers[i] ) )
821                ret[retCount++] = peers[i];
822            i = ( i + 1 ) % peerCount;
823            ++j;
824        } while( i != fencepost );
825    }
826    assert( j == peerCount );
827    *setmeCount = retCount;
828    return ret;
829}
830
831static uint32_t
832getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
833{
834    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
835    const uint64_t blockPos = tor->blockSize * b;
836    assert( blockPos >= piecePos );
837    return (uint32_t)( blockPos - piecePos );
838}
839
840static int
841refillPulse( void * vtorrent )
842{
843    tr_block_index_t block;
844    int peerCount;
845    int webseedCount;
846    tr_peer ** peers;
847    tr_webseed ** webseeds;
848    struct tr_blockIterator * blockIterator;
849    Torrent * t = vtorrent;
850    tr_torrent * tor = t->tor;
851
852    if( !t->isRunning )
853        return TRUE;
854    if( tr_torrentIsSeed( t->tor ) )
855        return TRUE;
856
857    torrentLock( t );
858    tordbg( t, "Refilling Request Buffers..." );
859
860    blockIterator = blockIteratorNew( t );
861    peers = getPeersUploadingToClient( t, &peerCount );
862    webseedCount = tr_ptrArraySize( t->webseeds );
863    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
864                          webseedCount * sizeof( tr_webseed* ) );
865
866    while( ( webseedCount || peerCount )
867        && blockIteratorNext( blockIterator, &block ) )
868    {
869        int j;
870        int handled = FALSE;
871
872        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
873        const uint32_t offset = getBlockOffsetInPiece( tor, block );
874        const uint32_t length = tr_torBlockCountBytes( tor, block );
875
876        /* find a peer who can ask for this block */
877        for( j=0; !handled && j<peerCount; )
878        {
879            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
880                                                   index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                case TR_ADDREQ_CLIENT_CHOKED:
885                    peers[j] = peers[--peerCount];
886                    break;
887
888                case TR_ADDREQ_MISSING:
889                case TR_ADDREQ_DUPLICATE:
890                    ++j;
891                    break;
892
893                case TR_ADDREQ_OK:
894                    incrementPieceRequests( t, index );
895                    handled = TRUE;
896                    break;
897
898                default:
899                    assert( 0 && "unhandled value" );
900                    break;
901            }
902        }
903
904        /* maybe one of the webseeds can do it */
905        for( j=0; !handled && j<webseedCount; )
906        {
907            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
908                                                          index, offset, length );
909            switch( val )
910            {
911                case TR_ADDREQ_FULL:
912                    webseeds[j] = webseeds[--webseedCount];
913                    break;
914
915                case TR_ADDREQ_OK:
916                    incrementPieceRequests( t, index );
917                    handled = TRUE;
918                    break;
919
920                default:
921                    assert( 0 && "unhandled value" );
922                    break;
923            }
924        }
925    }
926
927    /* cleanup */
928    blockIteratorFree( blockIterator );
929    tr_free( webseeds );
930    tr_free( peers );
931
932    t->refillTimer = NULL;
933    torrentUnlock( t );
934    return FALSE;
935}
936
937static void
938broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
939{
940    int i, size;
941    tr_peer ** peers;
942
943    assert( torrentIsLocked( t ) );
944
945    peers = getConnectedPeers( t, &size );
946    for( i=0; i<size; ++i )
947        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
948    tr_free( peers );
949}
950
951static void
952addStrike( Torrent * t,
953           tr_peer * peer )
954{
955    tordbg( t, "increasing peer %s strike count to %d",
956            tr_peerIoAddrStr( &peer->in_addr,
957                              peer->port ), peer->strikes + 1 );
958
959    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
960    {
961        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
962        atom->myflags |= MYFLAG_BANNED;
963        peer->doPurge = 1;
964        tordbg( t, "banning peer %s",
965               tr_peerIoAddrStr( &atom->addr, atom->port ) );
966    }
967}
968
969static void
970gotBadPiece( Torrent *        t,
971             tr_piece_index_t pieceIndex )
972{
973    tr_torrent *   tor = t->tor;
974    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
975
976    tor->corruptCur += byteCount;
977    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
978}
979
980static void
981refillSoon( Torrent * t )
982{
983    if( t->refillTimer == NULL )
984        t->refillTimer = tr_timerNew( t->manager->session,
985                                      refillPulse, t,
986                                      REFILL_PERIOD_MSEC );
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent *             t = (Torrent *) vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_NEED_REQ:
1003            refillSoon( t );
1004            break;
1005
1006        case TR_PEER_CANCEL:
1007            decrementPieceRequests( t, e->pieceIndex );
1008            break;
1009
1010        case TR_PEER_PEER_GOT_DATA:
1011        {
1012            const time_t now = time( NULL );
1013            tr_torrent * tor = t->tor;
1014
1015            tor->activityDate = now;
1016
1017            if( e->wasPieceData )
1018                tor->uploadedCur += e->length;
1019
1020            /* update the stats */
1021            if( e->wasPieceData )
1022                tr_statsAddUploaded( tor->session, e->length );
1023
1024            /* update our atom */
1025            if( peer ) {
1026                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1027                a->piece_data_time = now;
1028            }
1029
1030            break;
1031        }
1032
1033        case TR_PEER_CLIENT_GOT_DATA:
1034        {
1035            const time_t now = time( NULL );
1036            tr_torrent * tor = t->tor;
1037
1038            tor->activityDate = now;
1039
1040            /* only add this to downloadedCur if we got it from a peer --
1041             * webseeds shouldn't count against our ratio.  As one tracker
1042             * admin put it, "Those pieces are downloaded directly from the
1043             * content distributor, not the peers, it is the tracker's job
1044             * to manage the swarms, not the web server and does not fit
1045             * into the jurisdiction of the tracker." */
1046            if( peer && e->wasPieceData )
1047                tor->downloadedCur += e->length;
1048
1049            /* update the stats */ 
1050            if( e->wasPieceData )
1051                tr_statsAddDownloaded( tor->session, e->length );
1052
1053            /* update our atom */
1054            if( peer ) {
1055                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1056                a->piece_data_time = now;
1057            }
1058
1059            break;
1060        }
1061
1062        case TR_PEER_PEER_PROGRESS:
1063        {
1064            if( peer )
1065            {
1066                struct peer_atom * atom = getExistingAtom( t,
1067                                                           &peer->in_addr );
1068                const int          peerIsSeed = e->progress >= 1.0;
1069                if( peerIsSeed )
1070                {
1071                    tordbg( t, "marking peer %s as a seed",
1072                           tr_peerIoAddrStr( &atom->addr,
1073                                             atom->port ) );
1074                    atom->flags |= ADDED_F_SEED_FLAG;
1075                }
1076                else
1077                {
1078                    tordbg( t, "marking peer %s as a non-seed",
1079                           tr_peerIoAddrStr( &atom->addr,
1080                                             atom->port ) );
1081                    atom->flags &= ~ADDED_F_SEED_FLAG;
1082                }
1083            }
1084            break;
1085        }
1086
1087        case TR_PEER_CLIENT_GOT_BLOCK:
1088        {
1089            tr_torrent *     tor = t->tor;
1090
1091            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1092                                                e->offset );
1093
1094            tr_cpBlockAdd( tor->completion, block );
1095            decrementPieceRequests( t, e->pieceIndex );
1096
1097            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1098
1099            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1100            {
1101                const tr_piece_index_t p = e->pieceIndex;
1102                const int              ok = tr_ioTestPiece( tor, p );
1103
1104                if( !ok )
1105                {
1106                    tr_torerr( tor,
1107                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1108                              (unsigned long)p );
1109                }
1110
1111                tr_torrentSetHasPiece( tor, p, ok );
1112                tr_torrentSetPieceChecked( tor, p, TRUE );
1113                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1114
1115                if( !ok )
1116                    gotBadPiece( t, p );
1117                else
1118                {
1119                    int        i, peerCount;
1120                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1121                    for( i = 0; i < peerCount; ++i )
1122                        tr_peerMsgsHave( peers[i]->msgs, p );
1123                    tr_free( peers );
1124                }
1125
1126                tr_torrentRecheckCompleteness( tor );
1127            }
1128            break;
1129        }
1130
1131        case TR_PEER_ERROR:
1132            if( e->err == EINVAL )
1133            {
1134                addStrike( t, peer );
1135                peer->doPurge = 1;
1136            }
1137            else if( ( e->err == ERANGE )
1138                  || ( e->err == EMSGSIZE )
1139                  || ( e->err == ENOTCONN ) )
1140            {
1141                /* some protocol error from the peer */
1142                peer->doPurge = 1;
1143            }
1144            else /* a local error, such as an IO error */
1145            {
1146                t->tor->error = e->err;
1147                tr_strlcpy( t->tor->errorString,
1148                            tr_strerror( t->tor->error ),
1149                            sizeof( t->tor->errorString ) );
1150                tr_torrentStop( t->tor );
1151            }
1152            break;
1153
1154        default:
1155            assert( 0 );
1156    }
1157
1158    torrentUnlock( t );
1159}
1160
1161static void
1162ensureAtomExists( Torrent *              t,
1163                  const struct in_addr * addr,
1164                  uint16_t               port,
1165                  uint8_t                flags,
1166                  uint8_t                from )
1167{
1168    if( getExistingAtom( t, addr ) == NULL )
1169    {
1170        struct peer_atom * a;
1171        a = tr_new0( struct peer_atom, 1 );
1172        a->addr = *addr;
1173        a->port = port;
1174        a->flags = flags;
1175        a->from = from;
1176        tordbg( t, "got a new atom: %s",
1177               tr_peerIoAddrStr( &a->addr, a->port ) );
1178        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1179    }
1180}
1181
1182static int
1183getMaxPeerCount( const tr_torrent * tor )
1184{
1185    return tor->maxConnectedPeers;
1186}
1187
1188static int
1189getPeerCount( const Torrent * t )
1190{
1191    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1192               t->outgoingHandshakes );
1193}
1194
1195/* FIXME: this is kind of a mess. */
1196static int
1197myHandshakeDoneCB( tr_handshake *  handshake,
1198                   tr_peerIo *     io,
1199                   int             isConnected,
1200                   const uint8_t * peer_id,
1201                   void *          vmanager )
1202{
1203    int                    ok = isConnected;
1204    int                    success = FALSE;
1205    uint16_t               port;
1206    const struct in_addr * addr;
1207    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1208    Torrent *              t;
1209    tr_handshake *         ours;
1210
1211    assert( io );
1212    assert( isConnected == 0 || isConnected == 1 );
1213
1214    t = tr_peerIoHasTorrentHash( io )
1215        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1216        : NULL;
1217
1218    if( tr_peerIoIsIncoming ( io ) )
1219        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1220                                        handshake, handshakeCompare );
1221    else if( t )
1222        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1223                                        handshake, handshakeCompare );
1224    else
1225        ours = handshake;
1226
1227    assert( ours );
1228    assert( ours == handshake );
1229
1230    if( t )
1231        torrentLock( t );
1232
1233    addr = tr_peerIoGetAddress( io, &port );
1234
1235    if( !ok || !t || !t->isRunning )
1236    {
1237        if( t )
1238        {
1239            struct peer_atom * atom = getExistingAtom( t, addr );
1240            if( atom )
1241                ++atom->numFails;
1242        }
1243
1244        tr_peerIoFree( io );
1245    }
1246    else /* looking good */
1247    {
1248        struct peer_atom * atom;
1249        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1250        atom = getExistingAtom( t, addr );
1251        atom->time = time( NULL );
1252
1253        if( atom->myflags & MYFLAG_BANNED )
1254        {
1255            tordbg( t, "banned peer %s tried to reconnect",
1256                   tr_peerIoAddrStr( &atom->addr,
1257                                     atom->port ) );
1258            tr_peerIoFree( io );
1259        }
1260        else if( tr_peerIoIsIncoming( io )
1261               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1262
1263        {
1264            tr_peerIoFree( io );
1265        }
1266        else
1267        {
1268            tr_peer * peer = getExistingPeer( t, addr );
1269
1270            if( peer ) /* we already have this peer */
1271            {
1272                tr_peerIoFree( io );
1273            }
1274            else
1275            {
1276                peer = getPeer( t, addr );
1277                tr_free( peer->client );
1278
1279                if( !peer_id )
1280                    peer->client = NULL;
1281                else {
1282                    char client[128];
1283                    tr_clientForId( client, sizeof( client ), peer_id );
1284                    peer->client = tr_strdup( client );
1285                }
1286
1287                peer->port = port;
1288                peer->io = io;
1289                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1290                tr_peerIoSetBandwidth( io, peer->bandwidth );
1291
1292                success = TRUE;
1293            }
1294        }
1295    }
1296
1297    if( t )
1298        torrentUnlock( t );
1299
1300    return success;
1301}
1302
1303void
1304tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1305                       struct in_addr * addr,
1306                       uint16_t         port,
1307                       int              socket )
1308{
1309    managerLock( manager );
1310
1311    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1312    {
1313        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1314               inet_ntoa( *addr ) );
1315        tr_netClose( socket );
1316    }
1317    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1318    {
1319        tr_netClose( socket );
1320    }
1321    else /* we don't have a connetion to them yet... */
1322    {
1323        tr_peerIo *    io;
1324        tr_handshake * handshake;
1325
1326        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1327
1328        handshake = tr_handshakeNew( io,
1329                                     manager->session->encryptionMode,
1330                                     myHandshakeDoneCB,
1331                                     manager );
1332
1333        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1334                                 handshakeCompare );
1335    }
1336
1337    managerUnlock( manager );
1338}
1339
1340void
1341tr_peerMgrAddPex( tr_peerMgr *    manager,
1342                  const uint8_t * torrentHash,
1343                  uint8_t         from,
1344                  const tr_pex *  pex )
1345{
1346    Torrent * t;
1347
1348    managerLock( manager );
1349
1350    t = getExistingTorrent( manager, torrentHash );
1351    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1352        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1353
1354    managerUnlock( manager );
1355}
1356
1357tr_pex *
1358tr_peerMgrCompactToPex( const void *    compact,
1359                        size_t          compactLen,
1360                        const uint8_t * added_f,
1361                        size_t          added_f_len,
1362                        size_t *        pexCount )
1363{
1364    size_t          i;
1365    size_t          n = compactLen / 6;
1366    const uint8_t * walk = compact;
1367    tr_pex *        pex = tr_new0( tr_pex, n );
1368
1369    for( i = 0; i < n; ++i )
1370    {
1371        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1372        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1373        if( added_f && ( n == added_f_len ) )
1374            pex[i].flags = added_f[i];
1375    }
1376
1377    *pexCount = n;
1378    return pex;
1379}
1380
1381/**
1382***
1383**/
1384
1385void
1386tr_peerMgrSetBlame( tr_peerMgr *     manager,
1387                    const uint8_t *  torrentHash,
1388                    tr_piece_index_t pieceIndex,
1389                    int              success )
1390{
1391    if( !success )
1392    {
1393        int        peerCount, i;
1394        Torrent *  t = getExistingTorrent( manager, torrentHash );
1395        tr_peer ** peers;
1396
1397        assert( torrentIsLocked( t ) );
1398
1399        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1400        for( i = 0; i < peerCount; ++i )
1401        {
1402            tr_peer * peer = peers[i];
1403            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1404            {
1405                tordbg(
1406                    t,
1407                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1408                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1409                    pieceIndex, (int)peer->strikes + 1 );
1410                addStrike( t, peer );
1411            }
1412        }
1413    }
1414}
1415
1416int
1417tr_pexCompare( const void * va,
1418               const void * vb )
1419{
1420    const tr_pex * a = va;
1421    const tr_pex * b = vb;
1422    int            i =
1423        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1424
1425    if( i ) return i;
1426    if( a->port < b->port ) return -1;
1427    if( a->port > b->port ) return 1;
1428    return 0;
1429}
1430
1431int tr_pexCompare( const void * a,
1432                   const void * b );
1433
1434static int
1435peerPrefersCrypto( const tr_peer * peer )
1436{
1437    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1438        return TRUE;
1439
1440    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1441        return FALSE;
1442
1443    return tr_peerIoIsEncrypted( peer->io );
1444}
1445
1446int
1447tr_peerMgrGetPeers( tr_peerMgr *    manager,
1448                    const uint8_t * torrentHash,
1449                    tr_pex **       setme_pex )
1450{
1451    int peerCount = 0;
1452    const Torrent *  t;
1453
1454    managerLock( manager );
1455
1456    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1457    if( !t )
1458    {
1459        *setme_pex = NULL;
1460    }
1461    else
1462    {
1463        int i;
1464        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1465        tr_pex * pex = tr_new( tr_pex, peerCount );
1466        tr_pex * walk = pex;
1467
1468        for( i = 0; i < peerCount; ++i, ++walk )
1469        {
1470            const tr_peer * peer = peers[i];
1471            walk->in_addr = peer->in_addr;
1472            walk->port = peer->port;
1473            walk->flags = 0;
1474            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1475            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1476        }
1477
1478        assert( ( walk - pex ) == peerCount );
1479        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1480        *setme_pex = pex;
1481    }
1482
1483    managerUnlock( manager );
1484    return peerCount;
1485}
1486
1487static int reconnectPulse( void * vtorrent );
1488
1489static int rechokePulse( void * vtorrent );
1490
1491void
1492tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1493                        const uint8_t * torrentHash )
1494{
1495    Torrent * t;
1496
1497    managerLock( manager );
1498
1499    t = getExistingTorrent( manager, torrentHash );
1500
1501    assert( t );
1502    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1503    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1504
1505    if( !t->isRunning )
1506    {
1507        t->isRunning = 1;
1508
1509        t->reconnectTimer = tr_timerNew( t->manager->session,
1510                                         reconnectPulse, t,
1511                                         RECONNECT_PERIOD_MSEC );
1512
1513        t->rechokeTimer = tr_timerNew( t->manager->session,
1514                                       rechokePulse, t,
1515                                       RECHOKE_PERIOD_MSEC );
1516
1517        reconnectPulse( t );
1518
1519        rechokePulse( t );
1520
1521        if( !tr_ptrArrayEmpty( t->webseeds ) )
1522            refillSoon( t );
1523    }
1524
1525    managerUnlock( manager );
1526}
1527
1528static void
1529stopTorrent( Torrent * t )
1530{
1531    assert( torrentIsLocked( t ) );
1532
1533    t->isRunning = 0;
1534    tr_timerFree( &t->rechokeTimer );
1535    tr_timerFree( &t->reconnectTimer );
1536
1537    /* disconnect the peers. */
1538    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1539    tr_ptrArrayClear( t->peers );
1540
1541    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1542     * which removes the handshake from t->outgoingHandshakes... */
1543    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1544        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1545}
1546
1547void
1548tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1549                       const uint8_t * torrentHash )
1550{
1551    managerLock( manager );
1552
1553    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1554
1555    managerUnlock( manager );
1556}
1557
1558void
1559tr_peerMgrAddTorrent( tr_peerMgr * manager,
1560                      tr_torrent * tor )
1561{
1562    Torrent * t;
1563
1564    managerLock( manager );
1565
1566    assert( tor );
1567    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1568
1569    t = torrentConstructor( manager, tor );
1570    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1571
1572    managerUnlock( manager );
1573}
1574
1575void
1576tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1577                         const uint8_t * torrentHash )
1578{
1579    Torrent * t;
1580
1581    managerLock( manager );
1582
1583    t = getExistingTorrent( manager, torrentHash );
1584    assert( t );
1585    stopTorrent( t );
1586    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1587    torrentDestructor( t );
1588
1589    managerUnlock( manager );
1590}
1591
1592void
1593tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1594                               const uint8_t *    torrentHash,
1595                               int8_t *           tab,
1596                               unsigned int       tabCount )
1597{
1598    tr_piece_index_t   i;
1599    const Torrent *    t;
1600    const tr_torrent * tor;
1601    float              interval;
1602    int                isComplete;
1603    int                peerCount;
1604    const tr_peer **   peers;
1605
1606    managerLock( manager );
1607
1608    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1609    tor = t->tor;
1610    interval = tor->info.pieceCount / (float)tabCount;
1611    isComplete = tor
1612                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1613    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1614
1615    memset( tab, 0, tabCount );
1616
1617    for( i = 0; tor && i < tabCount; ++i )
1618    {
1619        const int piece = i * interval;
1620
1621        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1622            tab[i] = -1;
1623        else if( peerCount )
1624        {
1625            int j;
1626            for( j = 0; j < peerCount; ++j )
1627                if( tr_bitfieldHas( peers[j]->have, i ) )
1628                    ++tab[i];
1629        }
1630    }
1631
1632    managerUnlock( manager );
1633}
1634
1635/* Returns the pieces that are available from peers */
1636tr_bitfield*
1637tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1638                        const uint8_t *    torrentHash )
1639{
1640    int           i, size;
1641    Torrent *     t;
1642    tr_peer **    peers;
1643    tr_bitfield * pieces;
1644
1645    managerLock( manager );
1646
1647    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1648    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1649    peers = getConnectedPeers( t, &size );
1650    for( i = 0; i < size; ++i )
1651        tr_bitfieldOr( pieces, peers[i]->have );
1652
1653    managerUnlock( manager );
1654    tr_free( peers );
1655    return pieces;
1656}
1657
1658int
1659tr_peerMgrHasConnections( const tr_peerMgr * manager,
1660                          const uint8_t *    torrentHash )
1661{
1662    int             ret;
1663    const Torrent * t;
1664
1665    managerLock( manager );
1666
1667    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1668    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1669               || !tr_ptrArrayEmpty( t->webseeds ) );
1670
1671    managerUnlock( manager );
1672    return ret;
1673}
1674
1675void
1676tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1677                        const uint8_t *    torrentHash,
1678                        int *              setmePeersKnown,
1679                        int *              setmePeersConnected,
1680                        int *              setmeSeedsConnected,
1681                        int *              setmeWebseedsSendingToUs,
1682                        int *              setmePeersSendingToUs,
1683                        int *              setmePeersGettingFromUs,
1684                        int *              setmePeersFrom )
1685{
1686    int                 i, size;
1687    const Torrent *     t;
1688    const tr_peer **    peers;
1689    const tr_webseed ** webseeds;
1690
1691    managerLock( manager );
1692
1693    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1694    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1695
1696    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1697    *setmePeersConnected       = 0;
1698    *setmeSeedsConnected       = 0;
1699    *setmePeersGettingFromUs   = 0;
1700    *setmePeersSendingToUs     = 0;
1701    *setmeWebseedsSendingToUs  = 0;
1702
1703    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1704        setmePeersFrom[i] = 0;
1705
1706    for( i = 0; i < size; ++i )
1707    {
1708        const tr_peer *          peer = peers[i];
1709        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1710
1711        if( peer->io == NULL ) /* not connected */
1712            continue;
1713
1714        ++ * setmePeersConnected;
1715
1716        ++setmePeersFrom[atom->from];
1717
1718        if( clientIsDownloadingFrom( peer ) )
1719            ++ * setmePeersSendingToUs;
1720
1721        if( clientIsUploadingTo( peer ) )
1722            ++ * setmePeersGettingFromUs;
1723
1724        if( atom->flags & ADDED_F_SEED_FLAG )
1725            ++ * setmeSeedsConnected;
1726    }
1727
1728    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1729    for( i = 0; i < size; ++i )
1730    {
1731        if( tr_webseedIsActive( webseeds[i] ) )
1732            ++ * setmeWebseedsSendingToUs;
1733    }
1734
1735    managerUnlock( manager );
1736}
1737
1738float*
1739tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1740                     const uint8_t *    torrentHash )
1741{
1742    const Torrent *     t;
1743    const tr_webseed ** webseeds;
1744    int                 i;
1745    int                 webseedCount;
1746    float *             ret;
1747
1748    assert( manager );
1749    managerLock( manager );
1750
1751    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1752    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1753                                                     &webseedCount );
1754    assert( webseedCount == t->tor->info.webseedCount );
1755    ret = tr_new0( float, webseedCount );
1756
1757    for( i = 0; i < webseedCount; ++i )
1758        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1759            ret[i] = -1.0;
1760
1761    managerUnlock( manager );
1762    return ret;
1763}
1764
1765double
1766tr_peerGetPieceSpeed( const tr_peer    * peer,
1767                      tr_direction       direction )
1768{
1769    assert( peer );
1770    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1771
1772    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1773}
1774
1775
1776struct tr_peer_stat *
1777tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1778                     const   uint8_t     * torrentHash,
1779                     int                 * setmeCount UNUSED )
1780{
1781    int             i, size;
1782    const Torrent * t;
1783    tr_peer **      peers;
1784    tr_peer_stat *  ret;
1785
1786    assert( manager );
1787    managerLock( manager );
1788
1789    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1790    peers = getConnectedPeers( (Torrent*)t, &size );
1791    ret = tr_new0( tr_peer_stat, size );
1792
1793    for( i = 0; i < size; ++i )
1794    {
1795        char *                   pch;
1796        const tr_peer *          peer = peers[i];
1797        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1798        tr_peer_stat *           stat = ret + i;
1799
1800        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1801        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1802                   sizeof( stat->client ) );
1803        stat->port               = ntohs( peer->port );
1804        stat->from               = atom->from;
1805        stat->progress           = peer->progress;
1806        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1807        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1808        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1809        stat->peerIsChoked       = peer->peerIsChoked;
1810        stat->peerIsInterested   = peer->peerIsInterested;
1811        stat->clientIsChoked     = peer->clientIsChoked;
1812        stat->clientIsInterested = peer->clientIsInterested;
1813        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1814        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1815        stat->isUploadingTo      = clientIsUploadingTo( peer );
1816
1817        pch = stat->flagStr;
1818        if( t->optimistic == peer ) *pch++ = 'O';
1819        if( stat->isDownloadingFrom ) *pch++ = 'D';
1820        else if( stat->clientIsInterested ) *pch++ = 'd';
1821        if( stat->isUploadingTo ) *pch++ = 'U';
1822        else if( stat->peerIsInterested ) *pch++ = 'u';
1823        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1824                'K';
1825        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1826        if( stat->isEncrypted ) *pch++ = 'E';
1827        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1828        if( stat->isIncoming ) *pch++ = 'I';
1829        *pch = '\0';
1830    }
1831
1832    *setmeCount = size;
1833    tr_free( peers );
1834
1835    managerUnlock( manager );
1836    return ret;
1837}
1838
1839/**
1840***
1841**/
1842
1843struct ChokeData
1844{
1845    unsigned int    doUnchoke    : 1;
1846    unsigned int    isInterested : 1;
1847    unsigned int    isChoked     : 1;
1848    int             rate;
1849    tr_peer *       peer;
1850};
1851
1852static int
1853compareChoke( const void * va,
1854              const void * vb )
1855{
1856    const struct ChokeData * a = va;
1857    const struct ChokeData * b = vb;
1858
1859    if( a->rate != b->rate ) /* prefer higher overall speeds */
1860        return a->rate > b->rate ? -1 : 1;
1861
1862    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1863        return a->isChoked ? 1 : -1;
1864
1865    return 0;
1866}
1867
1868static int
1869isNew( const tr_peer * peer )
1870{
1871    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1872}
1873
1874static int
1875isSame( const tr_peer * peer )
1876{
1877    return peer && peer->client && strstr( peer->client, "Transmission" );
1878}
1879
1880/**
1881***
1882**/
1883
1884static void
1885rechoke( Torrent * t )
1886{
1887    int                i, peerCount, size, unchokedInterested;
1888    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1889    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1890    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1891
1892    assert( torrentIsLocked( t ) );
1893
1894    /* sort the peers by preference and rate */
1895    for( i = 0, size = 0; i < peerCount; ++i )
1896    {
1897        tr_peer * peer = peers[i];
1898        if( peer->progress >= 1.0 ) /* choke all seeds */
1899            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1900        else if( chokeAll )
1901            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1902        else {
1903            struct ChokeData * n = &choke[size++];
1904            n->peer         = peer;
1905            n->isInterested = peer->peerIsInterested;
1906            n->isChoked     = peer->peerIsChoked;
1907            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1908        }
1909    }
1910
1911    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1912
1913    /**
1914     * Reciprocation and number of uploads capping is managed by unchoking
1915     * the N peers which have the best upload rate and are interested.
1916     * This maximizes the client's download rate. These N peers are
1917     * referred to as downloaders, because they are interested in downloading
1918     * from the client.
1919     *
1920     * Peers which have a better upload rate (as compared to the downloaders)
1921     * but aren't interested get unchoked. If they become interested, the
1922     * downloader with the worst upload rate gets choked. If a client has
1923     * a complete file, it uses its upload rate rather than its download
1924     * rate to decide which peers to unchoke.
1925     */
1926    unchokedInterested = 0;
1927    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1928    {
1929        choke[i].doUnchoke = 1;
1930        if( choke[i].isInterested )
1931            ++unchokedInterested;
1932    }
1933
1934    /* optimistic unchoke */
1935    if( i < size )
1936    {
1937        int                n;
1938        struct ChokeData * c;
1939        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1940
1941        for( ; i < size; ++i )
1942        {
1943            if( choke[i].isInterested )
1944            {
1945                const tr_peer * peer = choke[i].peer;
1946                int             x = 1, y;
1947                if( isNew( peer ) ) x *= 3;
1948                if( isSame( peer ) ) x *= 3;
1949                for( y = 0; y < x; ++y )
1950                    tr_ptrArrayAppend( randPool, &choke[i] );
1951            }
1952        }
1953
1954        if( ( n = tr_ptrArraySize( randPool ) ) )
1955        {
1956            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1957            c->doUnchoke = 1;
1958            t->optimistic = c->peer;
1959        }
1960
1961        tr_ptrArrayFree( randPool, NULL );
1962    }
1963
1964    for( i = 0; i < size; ++i )
1965        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1966
1967    /* cleanup */
1968    tr_free( choke );
1969    tr_free( peers );
1970}
1971
1972static int
1973rechokePulse( void * vtorrent )
1974{
1975    Torrent * t = vtorrent;
1976
1977    torrentLock( t );
1978    rechoke( t );
1979    torrentUnlock( t );
1980    return TRUE;
1981}
1982
1983/***
1984****
1985****  Life and Death
1986****
1987***/
1988
1989static int
1990shouldPeerBeClosed( const Torrent * t,
1991                    const tr_peer * peer,
1992                    int             peerCount )
1993{
1994    const tr_torrent *       tor = t->tor;
1995    const time_t             now = time( NULL );
1996    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1997
1998    /* if it's marked for purging, close it */
1999    if( peer->doPurge )
2000    {
2001        tordbg( t, "purging peer %s because its doPurge flag is set",
2002               tr_peerIoAddrStr( &atom->addr,
2003                                 atom->port ) );
2004        return TRUE;
2005    }
2006
2007    /* if we're seeding and the peer has everything we have,
2008     * and enough time has passed for a pex exchange, then disconnect */
2009    if( tr_torrentIsSeed( tor ) )
2010    {
2011        int peerHasEverything;
2012        if( atom->flags & ADDED_F_SEED_FLAG )
2013            peerHasEverything = TRUE;
2014        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2015            peerHasEverything = FALSE;
2016        else
2017        {
2018            tr_bitfield * tmp =
2019                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2020            tr_bitfieldDifference( tmp, peer->have );
2021            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2022            tr_bitfieldFree( tmp );
2023        }
2024        if( peerHasEverything
2025          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2026        {
2027            tordbg( t, "purging peer %s because we're both seeds",
2028                   tr_peerIoAddrStr( &atom->addr,
2029                                     atom->port ) );
2030            return TRUE;
2031        }
2032    }
2033
2034    /* disconnect if it's been too long since piece data has been transferred.
2035     * this is on a sliding scale based on number of available peers... */
2036    {
2037        const int    relaxStrictnessIfFewerThanN =
2038            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2039        /* if we have >= relaxIfFewerThan, strictness is 100%.
2040         * if we have zero connections, strictness is 0% */
2041        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2042                                  ? 1.0
2043                                  : peerCount /
2044                                  (float)relaxStrictnessIfFewerThanN;
2045        const int    lo = MIN_UPLOAD_IDLE_SECS;
2046        const int    hi = MAX_UPLOAD_IDLE_SECS;
2047        const int    limit = lo + ( ( hi - lo ) * strictness );
2048        const time_t then = peer->pieceDataActivityDate;
2049        const int    idleTime = then ? ( now - then ) : 0;
2050        if( idleTime > limit )
2051        {
2052            tordbg(
2053                t,
2054                "purging peer %s because it's been %d secs since we shared anything",
2055                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2056            return TRUE;
2057        }
2058    }
2059
2060    return FALSE;
2061}
2062
2063static tr_peer **
2064getPeersToClose( Torrent * t,
2065                 int *     setmeSize )
2066{
2067    int               i, peerCount, outsize;
2068    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2069                                                           &peerCount );
2070    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2071
2072    assert( torrentIsLocked( t ) );
2073
2074    for( i = outsize = 0; i < peerCount; ++i )
2075        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2076            ret[outsize++] = peers[i];
2077
2078    *setmeSize = outsize;
2079    return ret;
2080}
2081
2082static int
2083compareCandidates( const void * va,
2084                   const void * vb )
2085{
2086    const struct peer_atom * a = *(const struct peer_atom**) va;
2087    const struct peer_atom * b = *(const struct peer_atom**) vb;
2088
2089    /* <Charles> Here we would probably want to try reconnecting to
2090     * peers that had most recently given us data. Lots of users have
2091     * trouble with resets due to their routers and/or ISPs. This way we
2092     * can quickly recover from an unwanted reset. So we sort
2093     * piece_data_time in descending order.
2094     */
2095
2096    if( a->piece_data_time != b->piece_data_time )
2097        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2098
2099    if( a->numFails != b->numFails )
2100        return a->numFails < b->numFails ? -1 : 1;
2101
2102    if( a->time != b->time )
2103        return a->time < b->time ? -1 : 1;
2104
2105    return 0;
2106}
2107
2108static int
2109getReconnectIntervalSecs( const struct peer_atom * atom )
2110{
2111    int          sec;
2112    const time_t now = time( NULL );
2113
2114    /* if we were recently connected to this peer and transferring piece
2115     * data, try to reconnect to them sooner rather that later -- we don't
2116     * want network troubles to get in the way of a good peer. */
2117    if( ( now - atom->piece_data_time ) <=
2118       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2119        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2120
2121    /* don't allow reconnects more often than our minimum */
2122    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2123        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2124
2125    /* otherwise, the interval depends on how many times we've tried
2126     * and failed to connect to the peer */
2127    else switch( atom->numFails )
2128        {
2129            case 0:
2130                sec = 0; break;
2131
2132            case 1:
2133                sec = 5; break;
2134
2135            case 2:
2136                sec = 2 * 60; break;
2137
2138            case 3:
2139                sec = 15 * 60; break;
2140
2141            case 4:
2142                sec = 30 * 60; break;
2143
2144            case 5:
2145                sec = 60 * 60; break;
2146
2147            default:
2148                sec = 120 * 60; break;
2149        }
2150
2151    return sec;
2152}
2153
2154static struct peer_atom **
2155getPeerCandidates(                               Torrent * t,
2156                                           int * setmeSize )
2157{
2158    int                 i, atomCount, retCount;
2159    struct peer_atom ** atoms;
2160    struct peer_atom ** ret;
2161    const time_t        now = time( NULL );
2162    const int           seed = tr_torrentIsSeed( t->tor );
2163
2164    assert( torrentIsLocked( t ) );
2165
2166    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2167    ret = tr_new( struct peer_atom*, atomCount );
2168    for( i = retCount = 0; i < atomCount; ++i )
2169    {
2170        int                interval;
2171        struct peer_atom * atom = atoms[i];
2172
2173        /* peer fed us too much bad data ... we only keep it around
2174         * now to weed it out in case someone sends it to us via pex */
2175        if( atom->myflags & MYFLAG_BANNED )
2176            continue;
2177
2178        /* peer was unconnectable before, so we're not going to keep trying.
2179         * this is needs a separate flag from `banned', since if they try
2180         * to connect to us later, we'll let them in */
2181        if( atom->myflags & MYFLAG_UNREACHABLE )
2182            continue;
2183
2184        /* we don't need two connections to the same peer... */
2185        if( peerIsInUse( t, &atom->addr ) )
2186            continue;
2187
2188        /* no need to connect if we're both seeds... */
2189        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2190            continue;
2191
2192        /* don't reconnect too often */
2193        interval = getReconnectIntervalSecs( atom );
2194        if( ( now - atom->time ) < interval )
2195        {
2196            tordbg(
2197                t,
2198                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2199                i, tr_peerIoAddrStr( &atom->addr,
2200                                     atom->port ), interval );
2201            continue;
2202        }
2203
2204        /* Don't connect to peers in our blocklist */
2205        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2206            continue;
2207
2208        ret[retCount++] = atom;
2209    }
2210
2211    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2212    *setmeSize = retCount;
2213    return ret;
2214}
2215
2216static int
2217reconnectPulse( void * vtorrent )
2218{
2219    Torrent *     t = vtorrent;
2220    static time_t prevTime = 0;
2221    static int    newConnectionsThisSecond = 0;
2222    time_t        now;
2223
2224    torrentLock( t );
2225
2226    now = time( NULL );
2227    if( prevTime != now )
2228    {
2229        prevTime = now;
2230        newConnectionsThisSecond = 0;
2231    }
2232
2233    if( !t->isRunning )
2234    {
2235        removeAllPeers( t );
2236    }
2237    else
2238    {
2239        int                 i, nCandidates, nBad;
2240        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2241        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2242
2243        if( nBad || nCandidates )
2244            tordbg(
2245                t, "reconnect pulse for [%s]: %d bad connections, "
2246                   "%d connection candidates, %d atoms, max per pulse is %d",
2247                t->tor->info.name, nBad, nCandidates,
2248                tr_ptrArraySize( t->pool ),
2249                (int)MAX_RECONNECTIONS_PER_PULSE );
2250
2251        /* disconnect some peers.
2252           if we transferred piece data, then they might be good peers,
2253           so reset their `numFails' weight to zero.  otherwise we connected
2254           to them fruitlessly, so mark it as another fail */
2255        for( i = 0; i < nBad; ++i )
2256        {
2257            tr_peer *          peer = connections[i];
2258            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2259            if( peer->pieceDataActivityDate )
2260                atom->numFails = 0;
2261            else
2262                ++atom->numFails;
2263            tordbg( t, "removing bad peer %s",
2264                   tr_peerIoGetAddrStr( peer->io ) );
2265            removePeer( t, peer );
2266        }
2267
2268        /* add some new ones */
2269        for( i = 0;    ( i < nCandidates )
2270           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2271           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2272           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2273             ++i )
2274        {
2275            tr_peerMgr *       mgr = t->manager;
2276            struct peer_atom * atom = candidates[i];
2277            tr_peerIo *        io;
2278
2279            tordbg( t, "Starting an OUTGOING connection with %s",
2280                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2281
2282            io =
2283                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2284                                      t->hash );
2285            if( io == NULL )
2286            {
2287                atom->myflags |= MYFLAG_UNREACHABLE;
2288            }
2289            else
2290            {
2291                tr_handshake * handshake = tr_handshakeNew(
2292                    io,
2293                    mgr->session->
2294                    encryptionMode,
2295                    myHandshakeDoneCB,
2296                    mgr );
2297
2298                assert( tr_peerIoGetTorrentHash( io ) );
2299
2300                ++newConnectionsThisSecond;
2301
2302                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2303                                         handshakeCompare );
2304            }
2305
2306            atom->time = time( NULL );
2307        }
2308
2309        /* cleanup */
2310        tr_free( connections );
2311        tr_free( candidates );
2312    }
2313
2314    torrentUnlock( t );
2315    return TRUE;
2316}
2317
2318/****
2319*****
2320*****  BANDWIDTH ALLOCATION
2321*****
2322****/
2323
2324static void
2325pumpAllPeers( tr_peerMgr * mgr )
2326{
2327    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2328    int       i, j;
2329
2330    for( i=0; i<torrentCount; ++i )
2331    {
2332        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2333        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2334        {
2335            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2336            tr_peerMsgsPulse( peer->msgs );
2337        }
2338    }
2339}
2340
2341static int
2342bandwidthPulse( void * vmgr )
2343{
2344    tr_peerMgr * mgr = vmgr;
2345    managerLock( mgr );
2346
2347    pumpAllPeers( mgr );
2348    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2349    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2350    pumpAllPeers( mgr );
2351
2352    managerUnlock( mgr );
2353    return TRUE;
2354}
Note: See TracBrowser for help on using the repository browser.