source: trunk/libtransmission/peer-mgr.c @ 8877

Last change on this file since 8877 was 8877, checked in by charles, 13 years ago

(trunk libT) possible fix for #2301, which is a valgrind complaint about uninitialized memory being written to disk while saving the bencoded .resume file. this commit theorizes that the warning is caused by not zeroing out the tr_peer array before filling it, so the extra bits in tr_addresses' ipv4/ipv6 union were never initialized. if this theory is correct, this commit (which zeros the memory first) should fix the bug.

  • Property svn:keywords set to Date Rev Author Id
File size: 74.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 8877 2009-08-07 00:58:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "fdlimit.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 400,
50   
51    /* how frequently to reallocate bandwidth */
52    BANDWIDTH_PERIOD_MSEC = 500,
53
54    /* how frequently to age out old piece request lists */
55    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = 500,
59
60    /* when many peers are available, keep idle ones this long */
61    MIN_UPLOAD_IDLE_SECS = ( 30 ),
62
63    /* when few peers are available, keep idle ones this long */
64    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
65
66    /* max # of peers to ask fer per torrent per reconnect pulse */
67    MAX_RECONNECTIONS_PER_PULSE = 16,
68
69    /* max number of peers to ask for per second overall.
70    * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 32,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* amount of time to keep a list of request pieces lying around
77       before it's considered too old and needs to be rebuilt */
78    PIECE_LIST_SHELF_LIFE_SECS = 60,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    MYFLAG_BANNED = 1,
82
83    /* use for bitwise operations w/peer_atom.myflags */
84    /* unreachable for now... but not banned.
85     * if they try to connect to us it's okay */
86    MYFLAG_UNREACHABLE = 2,
87
88    /* the minimum we'll wait before attempting to reconnect to a peer */
89    MINIMUM_RECONNECT_INTERVAL_SECS = 5
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    uint8_t     from;
116    uint8_t     flags;       /* these match the added_f flags */
117    uint8_t     myflags;     /* flags that aren't defined in added_f */
118    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
119    tr_port     port;
120    uint16_t    numFails;
121    tr_address  addr;
122    time_t      time;        /* when the peer's connection status last changed */
123    time_t      piece_data_time;
124};
125
126struct tr_blockIterator
127{
128    time_t expirationDate;
129    struct tr_torrent_peers * t;
130    tr_block_index_t blockIndex, blockCount, *blocks;
131    tr_piece_index_t pieceIndex, pieceCount, *pieces;
132};
133
134typedef struct tr_torrent_peers
135{
136    struct event               refillTimer;
137
138    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
139    tr_ptrArray                pool; /* struct peer_atom */
140    tr_ptrArray                peers; /* tr_peer */
141    tr_ptrArray                webseeds; /* tr_webseed */
142
143    tr_torrent               * tor;
144    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
145    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
146    struct tr_peerMgr        * manager;
147    int                      * pendingRequestCount;
148
149    tr_bool                    isRunning;
150}
151Torrent;
152
153struct tr_peerMgr
154{
155    tr_session      * session;
156    tr_ptrArray       incomingHandshakes; /* tr_handshake */
157    tr_timer        * bandwidthTimer;
158    tr_timer        * rechokeTimer;
159    tr_timer        * reconnectTimer;
160    tr_timer        * refillUpkeepTimer;
161};
162
163#define tordbg( t, ... ) \
164    do { \
165        if( tr_deepLoggingIsActive( ) ) \
166            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
167    } while( 0 )
168
169#define dbgmsg( ... ) \
170    do { \
171        if( tr_deepLoggingIsActive( ) ) \
172            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
173    } while( 0 )
174
175/**
176***
177**/
178
179static TR_INLINE void
180managerLock( const struct tr_peerMgr * manager )
181{
182    tr_globalLock( manager->session );
183}
184
185static TR_INLINE void
186managerUnlock( const struct tr_peerMgr * manager )
187{
188    tr_globalUnlock( manager->session );
189}
190
191static TR_INLINE void
192torrentLock( Torrent * torrent )
193{
194    managerLock( torrent->manager );
195}
196
197static TR_INLINE void
198torrentUnlock( Torrent * torrent )
199{
200    managerUnlock( torrent->manager );
201}
202
203static TR_INLINE int
204torrentIsLocked( const Torrent * t )
205{
206    return tr_globalIsLocked( t->manager->session );
207}
208
209/**
210***
211**/
212
213static int
214handshakeCompareToAddr( const void * va, const void * vb )
215{
216    const tr_handshake * a = va;
217
218    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
219}
220
221static int
222handshakeCompare( const void * a, const void * b )
223{
224    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
225}
226
227static tr_handshake*
228getExistingHandshake( tr_ptrArray      * handshakes,
229                      const tr_address * addr )
230{
231    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
232}
233
234static int
235comparePeerAtomToAddress( const void * va, const void * vb )
236{
237    const struct peer_atom * a = va;
238
239    return tr_compareAddresses( &a->addr, vb );
240}
241
242static int
243comparePeerAtoms( const void * va, const void * vb )
244{
245    const struct peer_atom * b = vb;
246
247    return comparePeerAtomToAddress( va, &b->addr );
248}
249
250/**
251***
252**/
253
254static Torrent*
255getExistingTorrent( tr_peerMgr *    manager,
256                    const uint8_t * hash )
257{
258    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
259
260    return tor == NULL ? NULL : tor->torrentPeers;
261}
262
263static int
264peerCompare( const void * va, const void * vb )
265{
266    const tr_peer * a = va;
267    const tr_peer * b = vb;
268
269    return tr_compareAddresses( &a->addr, &b->addr );
270}
271
272static int
273peerCompareToAddr( const void * va, const void * vb )
274{
275    const tr_peer * a = va;
276
277    return tr_compareAddresses( &a->addr, vb );
278}
279
280static tr_peer*
281getExistingPeer( Torrent          * torrent,
282                 const tr_address * addr )
283{
284    assert( torrentIsLocked( torrent ) );
285    assert( addr );
286
287    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
288}
289
290static struct peer_atom*
291getExistingAtom( const Torrent    * t,
292                 const tr_address * addr )
293{
294    Torrent * tt = (Torrent*)t;
295    assert( torrentIsLocked( t ) );
296    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
297}
298
299static tr_bool
300peerIsInUse( const Torrent    * ct,
301             const tr_address * addr )
302{
303    Torrent * t = (Torrent*) ct;
304
305    assert( torrentIsLocked ( t ) );
306
307    return getExistingPeer( t, addr )
308        || getExistingHandshake( &t->outgoingHandshakes, addr )
309        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
310}
311
312static tr_peer*
313peerConstructor( const tr_address * addr )
314{
315    tr_peer * p;
316    p = tr_new0( tr_peer, 1 );
317    p->addr = *addr;
318    return p;
319}
320
321static tr_peer*
322getPeer( Torrent          * torrent,
323         const tr_address * addr )
324{
325    tr_peer * peer;
326
327    assert( torrentIsLocked( torrent ) );
328
329    peer = getExistingPeer( torrent, addr );
330
331    if( peer == NULL )
332    {
333        peer = peerConstructor( addr );
334        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
335    }
336
337    return peer;
338}
339
340static void
341peerDestructor( tr_peer * peer )
342{
343    assert( peer );
344
345    if( peer->msgs != NULL )
346    {
347        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
348        tr_peerMsgsFree( peer->msgs );
349    }
350
351    tr_peerIoClear( peer->io );
352    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
353
354    tr_bitfieldFree( peer->have );
355    tr_bitfieldFree( peer->blame );
356    tr_free( peer->client );
357
358    tr_free( peer );
359}
360
361static void
362removePeer( Torrent * t, tr_peer * peer )
363{
364    tr_peer * removed;
365    struct peer_atom * atom = peer->atom;
366
367    assert( torrentIsLocked( t ) );
368    assert( atom );
369
370    atom->time = time( NULL );
371
372    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
373    assert( removed == peer );
374    peerDestructor( removed );
375}
376
377static void
378removeAllPeers( Torrent * t )
379{
380    while( !tr_ptrArrayEmpty( &t->peers ) )
381        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
382}
383
384static void blockIteratorFree( struct tr_blockIterator ** inout );
385
386static void
387torrentDestructor( void * vt )
388{
389    Torrent * t = vt;
390
391    assert( t );
392    assert( !t->isRunning );
393    assert( torrentIsLocked( t ) );
394    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
395    assert( tr_ptrArrayEmpty( &t->peers ) );
396
397    evtimer_del( &t->refillTimer );
398
399    blockIteratorFree( &t->refillQueue );
400    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
401    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
402    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
403    tr_ptrArrayDestruct( &t->peers, NULL );
404
405    tr_free( t->pendingRequestCount );
406    tr_free( t );
407}
408
409
410static void refillPulse( int, short, void* );
411
412static void peerCallbackFunc( void * vpeer,
413                              void * vevent,
414                              void * vt );
415
416static Torrent*
417torrentConstructor( tr_peerMgr * manager,
418                    tr_torrent * tor )
419{
420    int       i;
421    Torrent * t;
422
423    t = tr_new0( Torrent, 1 );
424    t->manager = manager;
425    t->tor = tor;
426    t->pool = TR_PTR_ARRAY_INIT;
427    t->peers = TR_PTR_ARRAY_INIT;
428    t->webseeds = TR_PTR_ARRAY_INIT;
429    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
430    evtimer_set( &t->refillTimer, refillPulse, t );
431
432
433    for( i = 0; i < tor->info.webseedCount; ++i )
434    {
435        tr_webseed * w =
436            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
437        tr_ptrArrayAppend( &t->webseeds, w );
438    }
439
440    return t;
441}
442
443
444static int bandwidthPulse ( void * vmgr );
445static int rechokePulse   ( void * vmgr );
446static int reconnectPulse ( void * vmgr );
447static int refillUpkeep   ( void * vmgr );
448
449tr_peerMgr*
450tr_peerMgrNew( tr_session * session )
451{
452    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
453    m->session = session;
454    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
455    return m;
456}
457
458static void
459deleteTimers( struct tr_peerMgr * m )
460{
461    if( m->bandwidthTimer )
462        tr_timerFree( &m->bandwidthTimer );
463
464    if( m->rechokeTimer )
465        tr_timerFree( &m->rechokeTimer );
466
467    if( m->reconnectTimer )
468        tr_timerFree( &m->reconnectTimer );
469
470    if( m->refillUpkeepTimer )
471        tr_timerFree( &m->refillUpkeepTimer );
472}
473
474void
475tr_peerMgrFree( tr_peerMgr * manager )
476{
477    managerLock( manager );
478
479    deleteTimers( manager );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
485
486    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
487
488    managerUnlock( manager );
489    tr_free( manager );
490}
491
492static int
493clientIsDownloadingFrom( const tr_peer * peer )
494{
495    return peer->clientIsInterested && !peer->clientIsChoked;
496}
497
498static int
499clientIsUploadingTo( const tr_peer * peer )
500{
501    return peer->peerIsInterested && !peer->peerIsChoked;
502}
503
504/***
505****
506***/
507
508tr_bool
509tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
510                      const tr_address  * addr )
511{
512    tr_bool isSeed = FALSE;
513    const Torrent * t = tor->torrentPeers;
514    const struct peer_atom * atom = getExistingAtom( t, addr );
515
516    if( atom )
517        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
518
519    return isSeed;
520}
521
522/****
523*****
524*****  REFILL
525*****
526****/
527
528static void
529assertValidPiece( Torrent * t, tr_piece_index_t piece )
530{
531    assert( t );
532    assert( t->tor );
533    assert( piece < t->tor->info.pieceCount );
534}
535
536static int
537getPieceRequests( Torrent * t, tr_piece_index_t piece )
538{
539    assertValidPiece( t, piece );
540
541    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
542}
543
544static void
545incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
546{
547    assertValidPiece( t, piece );
548
549    if( t->pendingRequestCount == NULL )
550        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
551    t->pendingRequestCount[piece]++;
552}
553
554static void
555decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
556{
557    assertValidPiece( t, piece );
558
559    if( t->pendingRequestCount )
560        t->pendingRequestCount[piece]--;
561}
562
563struct tr_refill_piece
564{
565    tr_priority_t    priority;
566    uint32_t         piece;
567    uint32_t         peerCount;
568    int              random;
569    int              pendingRequestCount;
570    int              missingBlockCount;
571};
572
573static int
574compareRefillPiece( const void * aIn, const void * bIn )
575{
576    const struct tr_refill_piece * a = aIn;
577    const struct tr_refill_piece * b = bIn;
578
579    /* if one piece has a higher priority, it goes first */
580    if( a->priority != b->priority )
581        return a->priority > b->priority ? -1 : 1;
582
583    /* have a per-priority endgame */
584    if( a->pendingRequestCount != b->pendingRequestCount )
585        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
586
587    /* fewer missing pieces goes first */
588    if( a->missingBlockCount != b->missingBlockCount )
589        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
590
591    /* otherwise if one has fewer peers, it goes first */
592    if( a->peerCount != b->peerCount )
593        return a->peerCount < b->peerCount ? -1 : 1;
594
595    /* otherwise go with our random seed */
596    if( a->random != b->random )
597        return a->random < b->random ? -1 : 1;
598
599    return 0;
600}
601
602static tr_piece_index_t *
603getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
604{
605    const tr_torrent  * tor = t->tor;
606    const tr_info     * inf = &tor->info;
607    tr_piece_index_t    i;
608    tr_piece_index_t    poolSize = 0;
609    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
610    int                 peerCount;
611    const tr_peer    ** peers;
612
613    assert( torrentIsLocked( t ) );
614
615    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
616    peerCount = tr_ptrArraySize( &t->peers );
617
618    /* make a list of the pieces that we want but don't have */
619    for( i = 0; i < inf->pieceCount; ++i )
620        if( !tor->info.pieces[i].dnd
621                && !tr_cpPieceIsComplete( &tor->completion, i ) )
622            pool[poolSize++] = i;
623
624    /* sort the pool by which to request next */
625    if( poolSize > 1 )
626    {
627        tr_piece_index_t j;
628        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
629
630        for( j = 0; j < poolSize; ++j )
631        {
632            int k;
633            const tr_piece_index_t piece = pool[j];
634            struct tr_refill_piece * setme = p + j;
635
636            setme->piece = piece;
637            setme->priority = inf->pieces[piece].priority;
638            setme->peerCount = 0;
639            setme->random = tr_cryptoWeakRandInt( INT_MAX );
640            setme->pendingRequestCount = getPieceRequests( t, piece );
641            setme->missingBlockCount
642                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
643
644            for( k = 0; k < peerCount; ++k )
645            {
646                const tr_peer * peer = peers[k];
647                if( peer->peerIsInterested
648                        && !peer->clientIsChoked
649                        && tr_bitfieldHas( peer->have, piece ) )
650                    ++setme->peerCount;
651            }
652        }
653
654        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
655               compareRefillPiece );
656
657        for( j = 0; j < poolSize; ++j )
658            pool[j] = p[j].piece;
659
660        tr_free( p );
661    }
662
663    *pieceCount = poolSize;
664    return pool;
665}
666
667static struct tr_blockIterator*
668blockIteratorNew( Torrent * t )
669{
670    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
671    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
672    i->t = t;
673    i->pieces = getPreferredPieces( t, &i->pieceCount );
674    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
675    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
676    return i;
677}
678
679static tr_bool
680blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
681{
682    tr_bool found;
683    Torrent * t = i->t;
684    tr_torrent * tor = t->tor;
685
686    while( ( i->blockIndex == i->blockCount )
687        && ( i->pieceIndex < i->pieceCount ) )
688    {
689        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
690        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
691        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
692        tr_block_index_t block;
693
694        assert( index < tor->info.pieceCount );
695
696        i->blockCount = 0;
697        i->blockIndex = 0;
698        for( block=b; block!=e; ++block )
699            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
700                i->blocks[i->blockCount++] = block;
701    }
702
703    assert( i->blockCount <= tor->blockCountInPiece );
704
705    if(( found = ( i->blockIndex < i->blockCount )))
706        *setme = i->blocks[i->blockIndex++];
707
708    return found;
709}
710
711static void
712blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
713{
714    i->blockIndex = i->blockCount;
715}
716
717static void
718blockIteratorFree( struct tr_blockIterator ** inout )
719{
720    struct tr_blockIterator * it = *inout;
721
722    if( it != NULL )
723    {
724        tr_free( it->blocks );
725        tr_free( it->pieces );
726        tr_free( it );
727    }
728
729    *inout = NULL;
730}
731
732static tr_peer**
733getPeersUploadingToClient( Torrent * t,
734                           int *     setmeCount )
735{
736    int j;
737    int peerCount = 0;
738    int retCount = 0;
739    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
740    tr_peer ** ret = tr_new( tr_peer *, peerCount );
741
742    j = 0; /* this is a temporary test to make sure we walk through all the peers */
743    if( peerCount )
744    {
745        /* Get a list of peers we're downloading from.
746           Pick a different starting point each time so all peers
747           get a chance at being the first in line */
748        const int fencepost = tr_cryptoWeakRandInt( peerCount );
749        int i = fencepost;
750        do {
751            if( clientIsDownloadingFrom( peers[i] ) )
752                ret[retCount++] = peers[i];
753            i = ( i + 1 ) % peerCount;
754            ++j;
755        } while( i != fencepost );
756    }
757    assert( j == peerCount );
758    *setmeCount = retCount;
759    return ret;
760}
761
762static uint32_t
763getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
764{
765    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
766    const uint64_t blockPos = tor->blockSize * b;
767    assert( blockPos >= piecePos );
768    return (uint32_t)( blockPos - piecePos );
769}
770
771static int
772refillUpkeep( void * vmgr )
773{
774    tr_torrent * tor = NULL;
775    tr_peerMgr * mgr = vmgr;
776    time_t now;
777    managerLock( mgr );
778
779    now = time( NULL );
780    while(( tor = tr_torrentNext( mgr->session, tor ))) {
781        Torrent * t = tor->torrentPeers;
782        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
783            tordbg( t, "refill queue is past its shelf date; discarding." );
784            blockIteratorFree( &t->refillQueue );
785        }
786    }
787
788    managerUnlock( mgr );
789    return TRUE;
790}
791
792static void
793refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
794{
795    tr_block_index_t block;
796    int peerCount;
797    int webseedCount;
798    tr_peer ** peers;
799    tr_webseed ** webseeds;
800    Torrent * t = vtorrent;
801    tr_torrent * tor = t->tor;
802    tr_bool hasNext = TRUE;
803
804    if( !t->isRunning )
805        return;
806    if( tr_torrentIsSeed( t->tor ) )
807        return;
808
809    torrentLock( t );
810    tordbg( t, "Refilling Request Buffers..." );
811
812    if( t->refillQueue == NULL )
813        t->refillQueue = blockIteratorNew( t );
814
815    peers = getPeersUploadingToClient( t, &peerCount );
816    webseedCount = tr_ptrArraySize( &t->webseeds );
817    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
818                          webseedCount * sizeof( tr_webseed* ) );
819
820    while( ( webseedCount || peerCount )
821        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
822    {
823        int j;
824        tr_bool handled = FALSE;
825
826        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
827        const uint32_t offset = getBlockOffsetInPiece( tor, block );
828        const uint32_t length = tr_torBlockCountBytes( tor, block );
829
830        assert( block < tor->blockCount );
831
832        /* find a peer who can ask for this block */
833        for( j=0; !handled && j<peerCount; )
834        {
835            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
836            switch( val )
837            {
838                case TR_ADDREQ_FULL:
839                case TR_ADDREQ_CLIENT_CHOKED:
840                    peers[j] = peers[--peerCount];
841                    break;
842
843                case TR_ADDREQ_MISSING:
844                case TR_ADDREQ_DUPLICATE:
845                    ++j;
846                    break;
847
848                case TR_ADDREQ_OK:
849                    incrementPieceRequests( t, index );
850                    handled = TRUE;
851                    break;
852
853                default:
854                    assert( 0 && "unhandled value" );
855                    break;
856            }
857        }
858
859        /* maybe one of the webseeds can do it */
860        for( j=0; !handled && j<webseedCount; )
861        {
862            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
863            switch( val )
864            {
865                case TR_ADDREQ_FULL:
866                    webseeds[j] = webseeds[--webseedCount];
867                    break;
868
869                case TR_ADDREQ_OK:
870                    incrementPieceRequests( t, index );
871                    handled = TRUE;
872                    break;
873
874                default:
875                    assert( 0 && "unhandled value" );
876                    break;
877            }
878        }
879
880        if( !handled )
881            blockIteratorSkipCurrentPiece( t->refillQueue );
882    }
883
884    /* cleanup */
885    tr_free( webseeds );
886    tr_free( peers );
887
888    if( !hasNext ) {
889        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
890        blockIteratorFree( &t->refillQueue );
891    }
892
893    torrentUnlock( t );
894}
895
896static void
897broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
898{
899    size_t i;
900    size_t peerCount;
901    tr_peer ** peers;
902
903    assert( torrentIsLocked( t ) );
904
905    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
906
907    peerCount = tr_ptrArraySize( &t->peers );
908    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
909    for( i=0; i<peerCount; ++i )
910        if( peers[i]->msgs )
911            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
912}
913
914static void
915addStrike( Torrent * t, tr_peer * peer )
916{
917    tordbg( t, "increasing peer %s strike count to %d",
918            tr_peerIoAddrStr( &peer->addr,
919                              peer->port ), peer->strikes + 1 );
920
921    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
922    {
923        struct peer_atom * atom = peer->atom;
924        atom->myflags |= MYFLAG_BANNED;
925        peer->doPurge = 1;
926        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
927    }
928}
929
930static void
931gotBadPiece( Torrent *        t,
932             tr_piece_index_t pieceIndex )
933{
934    tr_torrent *   tor = t->tor;
935    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
936
937    tor->corruptCur += byteCount;
938    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
939}
940
941static void
942refillSoon( Torrent * t )
943{
944    if( !evtimer_pending( &t->refillTimer, NULL ) )
945        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
946}
947
948static void
949peerSuggestedPiece( Torrent            * t UNUSED,
950                    tr_peer            * peer UNUSED,
951                    tr_piece_index_t     pieceIndex UNUSED,
952                    int                  isFastAllowed UNUSED )
953{
954#if 0
955    assert( t );
956    assert( peer );
957    assert( peer->msgs );
958
959    /* is this a valid piece? */
960    if(  pieceIndex >= t->tor->info.pieceCount )
961        return;
962
963    /* don't ask for it if we've already got it */
964    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
965        return;
966
967    /* don't ask for it if they don't have it */
968    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
969        return;
970
971    /* don't ask for it if we're choked and it's not fast */
972    if( !isFastAllowed && peer->clientIsChoked )
973        return;
974
975    /* request the blocks that we don't have in this piece */
976    {
977        tr_block_index_t block;
978        const tr_torrent * tor = t->tor;
979        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
980        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
981
982        for( block=start; block<end; ++block )
983        {
984            if( !tr_cpBlockIsComplete( tor->completion, block ) )
985            {
986                const uint32_t offset = getBlockOffsetInPiece( tor, block );
987                const uint32_t length = tr_torBlockCountBytes( tor, block );
988                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
989                incrementPieceRequests( t, pieceIndex );
990            }
991        }
992    }
993#endif
994}
995
996static void
997peerCallbackFunc( void * vpeer, void * vevent, void * vt )
998{
999    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1000    Torrent * t = vt;
1001    const tr_peer_event * e = vevent;
1002
1003    torrentLock( t );
1004
1005    switch( e->eventType )
1006    {
1007        case TR_PEER_UPLOAD_ONLY:
1008            /* update our atom */
1009            if( peer )
1010                peer->atom->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1011            break;
1012
1013        case TR_PEER_NEED_REQ:
1014            refillSoon( t );
1015            break;
1016
1017        case TR_PEER_CANCEL:
1018            decrementPieceRequests( t, e->pieceIndex );
1019            break;
1020
1021        case TR_PEER_PEER_GOT_DATA:
1022        {
1023            const time_t now = time( NULL );
1024            tr_torrent * tor = t->tor;
1025
1026            tr_torrentSetActivityDate( tor, now );
1027
1028            if( e->wasPieceData )
1029                tor->uploadedCur += e->length;
1030
1031            /* update the stats */
1032            if( e->wasPieceData )
1033                tr_statsAddUploaded( tor->session, e->length );
1034
1035            /* update our atom */
1036            if( peer && e->wasPieceData )
1037                peer->atom->piece_data_time = now;
1038
1039            tor->needsSeedRatioCheck = TRUE;
1040
1041            break;
1042        }
1043
1044        case TR_PEER_CLIENT_GOT_SUGGEST:
1045            if( peer )
1046                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1047            break;
1048
1049        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1050            if( peer )
1051                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1052            break;
1053
1054        case TR_PEER_CLIENT_GOT_DATA:
1055        {
1056            const time_t now = time( NULL );
1057            tr_torrent * tor = t->tor;
1058
1059            tr_torrentSetActivityDate( tor, now );
1060
1061            /* only add this to downloadedCur if we got it from a peer --
1062             * webseeds shouldn't count against our ratio.  As one tracker
1063             * admin put it, "Those pieces are downloaded directly from the
1064             * content distributor, not the peers, it is the tracker's job
1065             * to manage the swarms, not the web server and does not fit
1066             * into the jurisdiction of the tracker." */
1067            if( peer && e->wasPieceData )
1068                tor->downloadedCur += e->length;
1069
1070            /* update the stats */ 
1071            if( e->wasPieceData )
1072                tr_statsAddDownloaded( tor->session, e->length );
1073
1074            /* update our atom */
1075            if( peer && e->wasPieceData )
1076                peer->atom->piece_data_time = now;
1077
1078            break;
1079        }
1080
1081        case TR_PEER_PEER_PROGRESS:
1082        {
1083            if( peer )
1084            {
1085                struct peer_atom * atom = peer->atom;
1086                const int peerIsSeed = e->progress >= 1.0;
1087                if( peerIsSeed ) {
1088                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1089                    atom->flags |= ADDED_F_SEED_FLAG;
1090                } else {
1091                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1092                    atom->flags &= ~ADDED_F_SEED_FLAG;
1093                }
1094            }
1095            break;
1096        }
1097
1098        case TR_PEER_CLIENT_GOT_BLOCK:
1099        {
1100            tr_torrent * tor = t->tor;
1101
1102            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1103
1104            tr_cpBlockAdd( &tor->completion, block );
1105            decrementPieceRequests( t, e->pieceIndex );
1106
1107            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1108
1109            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1110            {
1111                const tr_piece_index_t p = e->pieceIndex;
1112                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1113
1114                if( !ok )
1115                {
1116                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1117                               (unsigned long)p );
1118                }
1119
1120                tr_torrentSetHasPiece( tor, p, ok );
1121                tr_torrentSetPieceChecked( tor, p, TRUE );
1122                tr_peerMgrSetBlame( tor, p, ok );
1123
1124                if( !ok )
1125                {
1126                    gotBadPiece( t, p );
1127                }
1128                else
1129                {
1130                    int i;
1131                    int peerCount;
1132                    tr_peer ** peers;
1133                    tr_file_index_t fileIndex;
1134
1135                    peerCount = tr_ptrArraySize( &t->peers );
1136                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1137                    for( i=0; i<peerCount; ++i )
1138                        tr_peerMsgsHave( peers[i]->msgs, p );
1139
1140                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1141                    {
1142                        const tr_file * file = &tor->info.files[fileIndex];
1143                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1144                        {
1145                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1146                            tordbg( t, "closing recently-completed file \"%s\"", path );
1147                            tr_fdFileClose( path );
1148                            tr_free( path );
1149                        }
1150                    }
1151                }
1152
1153                tr_torrentRecheckCompleteness( tor );
1154            }
1155            break;
1156        }
1157
1158        case TR_PEER_ERROR:
1159            if( e->err == EINVAL )
1160            {
1161                addStrike( t, peer );
1162                peer->doPurge = 1;
1163                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1164            }
1165            else if( ( e->err == ERANGE )
1166                  || ( e->err == EMSGSIZE )
1167                  || ( e->err == ENOTCONN ) )
1168            {
1169                /* some protocol error from the peer */
1170                peer->doPurge = 1;
1171                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1172            }
1173            else /* a local error, such as an IO error */
1174            {
1175                t->tor->error = TR_STAT_LOCAL_ERROR;
1176                tr_strlcpy( t->tor->errorString,
1177                            tr_strerror( e->err ),
1178                            sizeof( t->tor->errorString ) );
1179                tr_torrentStop( t->tor );
1180            }
1181            break;
1182
1183        default:
1184            assert( 0 );
1185    }
1186
1187    torrentUnlock( t );
1188}
1189
1190static void
1191ensureAtomExists( Torrent          * t,
1192                  const tr_address * addr,
1193                  tr_port            port,
1194                  uint8_t            flags,
1195                  uint8_t            from )
1196{
1197    if( getExistingAtom( t, addr ) == NULL )
1198    {
1199        struct peer_atom * a;
1200        a = tr_new0( struct peer_atom, 1 );
1201        a->addr = *addr;
1202        a->port = port;
1203        a->flags = flags;
1204        a->from = from;
1205        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1206        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1207    }
1208}
1209
1210static int
1211getMaxPeerCount( const tr_torrent * tor )
1212{
1213    return tor->maxConnectedPeers;
1214}
1215
1216static int
1217getPeerCount( const Torrent * t )
1218{
1219    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1220}
1221
1222/* FIXME: this is kind of a mess. */
1223static tr_bool
1224myHandshakeDoneCB( tr_handshake  * handshake,
1225                   tr_peerIo     * io,
1226                   tr_bool         isConnected,
1227                   const uint8_t * peer_id,
1228                   void          * vmanager )
1229{
1230    tr_bool            ok = isConnected;
1231    tr_bool            success = FALSE;
1232    tr_port            port;
1233    const tr_address * addr;
1234    tr_peerMgr       * manager = vmanager;
1235    Torrent          * t;
1236    tr_handshake     * ours;
1237
1238    assert( io );
1239    assert( tr_isBool( ok ) );
1240
1241    t = tr_peerIoHasTorrentHash( io )
1242        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1243        : NULL;
1244
1245    if( tr_peerIoIsIncoming ( io ) )
1246        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1247                                        handshake, handshakeCompare );
1248    else if( t )
1249        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1250                                        handshake, handshakeCompare );
1251    else
1252        ours = handshake;
1253
1254    assert( ours );
1255    assert( ours == handshake );
1256
1257    if( t )
1258        torrentLock( t );
1259
1260    addr = tr_peerIoGetAddress( io, &port );
1261
1262    if( !ok || !t || !t->isRunning )
1263    {
1264        if( t )
1265        {
1266            struct peer_atom * atom = getExistingAtom( t, addr );
1267            if( atom )
1268                ++atom->numFails;
1269        }
1270    }
1271    else /* looking good */
1272    {
1273        struct peer_atom * atom;
1274        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1275        atom = getExistingAtom( t, addr );
1276        atom->time = time( NULL );
1277        atom->piece_data_time = 0;
1278
1279        if( atom->myflags & MYFLAG_BANNED )
1280        {
1281            tordbg( t, "banned peer %s tried to reconnect",
1282                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1283        }
1284        else if( tr_peerIoIsIncoming( io )
1285               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1286
1287        {
1288        }
1289        else
1290        {
1291            tr_peer * peer = getExistingPeer( t, addr );
1292
1293            if( peer ) /* we already have this peer */
1294            {
1295            }
1296            else
1297            {
1298                peer = getPeer( t, addr );
1299                tr_free( peer->client );
1300
1301                if( !peer_id )
1302                    peer->client = NULL;
1303                else {
1304                    char client[128];
1305                    tr_clientForId( client, sizeof( client ), peer_id );
1306                    peer->client = tr_strdup( client );
1307                }
1308
1309                peer->port = port;
1310                peer->atom = atom;
1311                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1312                                                                balanced by our unref in peerDestructor()  */
1313                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1314                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1315
1316                success = TRUE;
1317            }
1318        }
1319    }
1320
1321    if( t )
1322        torrentUnlock( t );
1323
1324    return success;
1325}
1326
1327void
1328tr_peerMgrAddIncoming( tr_peerMgr * manager,
1329                       tr_address * addr,
1330                       tr_port      port,
1331                       int          socket )
1332{
1333    managerLock( manager );
1334
1335    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1336    {
1337        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1338        tr_netClose( socket );
1339    }
1340    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1341    {
1342        tr_netClose( socket );
1343    }
1344    else /* we don't have a connetion to them yet... */
1345    {
1346        tr_peerIo *    io;
1347        tr_handshake * handshake;
1348
1349        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1350
1351        handshake = tr_handshakeNew( io,
1352                                     manager->session->encryptionMode,
1353                                     myHandshakeDoneCB,
1354                                     manager );
1355
1356        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1357
1358        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1359                                 handshakeCompare );
1360    }
1361
1362    managerUnlock( manager );
1363}
1364
1365static tr_bool
1366tr_isPex( const tr_pex * pex )
1367{
1368    return pex && tr_isAddress( &pex->addr );
1369}
1370
1371void
1372tr_peerMgrAddPex( tr_torrent   *  tor,
1373                  uint8_t         from,
1374                  const tr_pex *  pex )
1375{
1376    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1377    {
1378        Torrent * t = tor->torrentPeers;
1379        managerLock( t->manager );
1380
1381        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1382            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1383                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1384
1385        managerUnlock( t->manager );
1386    }
1387}
1388
1389tr_pex *
1390tr_peerMgrCompactToPex( const void *    compact,
1391                        size_t          compactLen,
1392                        const uint8_t * added_f,
1393                        size_t          added_f_len,
1394                        size_t *        pexCount )
1395{
1396    size_t          i;
1397    size_t          n = compactLen / 6;
1398    const uint8_t * walk = compact;
1399    tr_pex *        pex = tr_new0( tr_pex, n );
1400
1401    for( i = 0; i < n; ++i )
1402    {
1403        pex[i].addr.type = TR_AF_INET;
1404        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1405        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1406        if( added_f && ( n == added_f_len ) )
1407            pex[i].flags = added_f[i];
1408    }
1409
1410    *pexCount = n;
1411    return pex;
1412}
1413
1414tr_pex *
1415tr_peerMgrCompact6ToPex( const void    * compact,
1416                         size_t          compactLen,
1417                         const uint8_t * added_f,
1418                         size_t          added_f_len,
1419                         size_t        * pexCount )
1420{
1421    size_t          i;
1422    size_t          n = compactLen / 18;
1423    const uint8_t * walk = compact;
1424    tr_pex *        pex = tr_new0( tr_pex, n );
1425   
1426    for( i = 0; i < n; ++i )
1427    {
1428        pex[i].addr.type = TR_AF_INET6;
1429        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1430        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1431        if( added_f && ( n == added_f_len ) )
1432            pex[i].flags = added_f[i];
1433    }
1434   
1435    *pexCount = n;
1436    return pex;
1437}
1438
1439tr_pex *
1440tr_peerMgrArrayToPex( const void * array,
1441                      size_t       arrayLen,
1442                      size_t      * pexCount )
1443{
1444    size_t          i;
1445    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1446    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1447    const uint8_t * walk = array;
1448    tr_pex        * pex = tr_new0( tr_pex, n );
1449   
1450    for( i = 0 ; i < n ; i++ ) {
1451        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1452        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1453        pex[i].flags = 0x00;
1454        walk += sizeof( tr_address ) + 2;
1455    }
1456   
1457    *pexCount = n;
1458    return pex;
1459}
1460
1461/**
1462***
1463**/
1464
1465void
1466tr_peerMgrSetBlame( tr_torrent     * tor,
1467                    tr_piece_index_t pieceIndex,
1468                    int              success )
1469{
1470    if( !success )
1471    {
1472        int        peerCount, i;
1473        Torrent *  t = tor->torrentPeers;
1474        tr_peer ** peers;
1475
1476        assert( torrentIsLocked( t ) );
1477
1478        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1479        for( i = 0; i < peerCount; ++i )
1480        {
1481            tr_peer * peer = peers[i];
1482            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1483            {
1484                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1485                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1486                        pieceIndex, (int)peer->strikes + 1 );
1487                addStrike( t, peer );
1488            }
1489        }
1490    }
1491}
1492
1493int
1494tr_pexCompare( const void * va, const void * vb )
1495{
1496    const tr_pex * a = va;
1497    const tr_pex * b = vb;
1498    int i;
1499
1500    assert( tr_isPex( a ) );
1501    assert( tr_isPex( b ) );
1502
1503    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1504        return i;
1505
1506    if( a->port != b->port )
1507        return a->port < b->port ? -1 : 1;
1508
1509    return 0;
1510}
1511
1512static int
1513peerPrefersCrypto( const tr_peer * peer )
1514{
1515    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1516        return TRUE;
1517
1518    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1519        return FALSE;
1520
1521    return tr_peerIoIsEncrypted( peer->io );
1522}
1523
1524int
1525tr_peerMgrGetPeers( tr_torrent      * tor,
1526                    tr_pex         ** setme_pex,
1527                    uint8_t           af)
1528{
1529    int peersReturning = 0;
1530    const Torrent * t = tor->torrentPeers;
1531
1532    managerLock( t->manager );
1533
1534    {
1535        int i;
1536        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1537        const int peerCount = tr_ptrArraySize( &t->peers );
1538        /* for now, this will waste memory on torrents that have both
1539         * ipv6 and ipv4 peers */
1540        tr_pex * pex = tr_new0( tr_pex, peerCount );
1541        tr_pex * walk = pex;
1542
1543        for( i=0; i<peerCount; ++i )
1544        {
1545            const tr_peer * peer = peers[i];
1546            if( peer->addr.type == af )
1547            {
1548                const struct peer_atom * atom = peer->atom;
1549
1550                assert( tr_isAddress( &peer->addr ) );
1551                walk->addr = peer->addr;
1552                walk->port = peer->port;
1553                walk->flags = 0;
1554                if( peerPrefersCrypto( peer ) )
1555                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1556                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1557                    walk->flags |= ADDED_F_SEED_FLAG;
1558                ++peersReturning;
1559                ++walk;
1560            }
1561        }
1562
1563        assert( ( walk - pex ) == peersReturning );
1564        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1565        *setme_pex = pex;
1566    }
1567
1568    managerUnlock( t->manager );
1569    return peersReturning;
1570}
1571
1572static void
1573ensureMgrTimersExist( struct tr_peerMgr * m )
1574{
1575    tr_session * s = m->session;
1576
1577    if( m->bandwidthTimer == NULL )
1578        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1579
1580    if( m->rechokeTimer == NULL )
1581        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1582
1583    if( m->reconnectTimer == NULL )
1584        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1585
1586    if( m->refillUpkeepTimer == NULL )
1587        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1588}
1589
1590void
1591tr_peerMgrStartTorrent( tr_torrent * tor )
1592{
1593    Torrent * t = tor->torrentPeers;
1594
1595    assert( t != NULL );
1596    managerLock( t->manager );
1597    ensureMgrTimersExist( t->manager );
1598
1599    if( !t->isRunning )
1600    {
1601        t->isRunning = TRUE;
1602
1603        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1604            refillSoon( t );
1605    }
1606
1607    rechokePulse( t->manager );
1608    managerUnlock( t->manager );
1609}
1610
1611static void
1612stopTorrent( Torrent * t )
1613{
1614    assert( torrentIsLocked( t ) );
1615
1616    t->isRunning = FALSE;
1617
1618    /* disconnect the peers. */
1619    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1620    tr_ptrArrayClear( &t->peers );
1621
1622    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1623     * which removes the handshake from t->outgoingHandshakes... */
1624    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1625        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1626}
1627
1628void
1629tr_peerMgrStopTorrent( tr_torrent * tor )
1630{
1631    Torrent * t = tor->torrentPeers;
1632
1633    managerLock( t->manager );
1634
1635    stopTorrent( t );
1636
1637    managerUnlock( t->manager );
1638}
1639
1640void
1641tr_peerMgrAddTorrent( tr_peerMgr * manager,
1642                      tr_torrent * tor )
1643{
1644    managerLock( manager );
1645
1646    assert( tor );
1647    assert( tor->torrentPeers == NULL );
1648
1649    tor->torrentPeers = torrentConstructor( manager, tor );
1650
1651    managerUnlock( manager );
1652}
1653
1654void
1655tr_peerMgrRemoveTorrent( tr_torrent * tor )
1656{
1657    tr_torrentLock( tor );
1658
1659    stopTorrent( tor->torrentPeers );
1660    torrentDestructor( tor->torrentPeers );
1661
1662    tr_torrentUnlock( tor );
1663}
1664
1665void
1666tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1667                               int8_t           * tab,
1668                               unsigned int       tabCount )
1669{
1670    tr_piece_index_t   i;
1671    const Torrent *    t;
1672    float              interval;
1673    tr_bool            isSeed;
1674    int                peerCount;
1675    const tr_peer **   peers;
1676    tr_torrentLock( tor );
1677
1678    t = tor->torrentPeers;
1679    tor = t->tor;
1680    interval = tor->info.pieceCount / (float)tabCount;
1681    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1682    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1683    peerCount = tr_ptrArraySize( &t->peers );
1684
1685    memset( tab, 0, tabCount );
1686
1687    for( i = 0; tor && i < tabCount; ++i )
1688    {
1689        const int piece = i * interval;
1690
1691        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1692            tab[i] = -1;
1693        else if( peerCount ) {
1694            int j;
1695            for( j = 0; j < peerCount; ++j )
1696                if( tr_bitfieldHas( peers[j]->have, i ) )
1697                    ++tab[i];
1698        }
1699    }
1700
1701    tr_torrentUnlock( tor );
1702}
1703
1704/* Returns the pieces that are available from peers */
1705tr_bitfield*
1706tr_peerMgrGetAvailable( const tr_torrent * tor )
1707{
1708    int i;
1709    int peerCount;
1710    Torrent * t = tor->torrentPeers;
1711    const tr_peer ** peers;
1712    tr_bitfield * pieces;
1713    managerLock( t->manager );
1714
1715    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1716    peerCount = tr_ptrArraySize( &t->peers );
1717    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1718    for( i=0; i<peerCount; ++i )
1719        tr_bitfieldOr( pieces, peers[i]->have );
1720
1721    managerUnlock( t->manager );
1722    return pieces;
1723}
1724
1725void
1726tr_peerMgrTorrentStats( tr_torrent       * tor,
1727                        int              * setmePeersKnown,
1728                        int              * setmePeersConnected,
1729                        int              * setmeSeedsConnected,
1730                        int              * setmeWebseedsSendingToUs,
1731                        int              * setmePeersSendingToUs,
1732                        int              * setmePeersGettingFromUs,
1733                        int              * setmePeersFrom )
1734{
1735    int i, size;
1736    const Torrent * t = tor->torrentPeers;
1737    const tr_peer ** peers;
1738    const tr_webseed ** webseeds;
1739
1740    managerLock( t->manager );
1741
1742    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1743    size = tr_ptrArraySize( &t->peers );
1744
1745    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1746    *setmePeersConnected       = 0;
1747    *setmeSeedsConnected       = 0;
1748    *setmePeersGettingFromUs   = 0;
1749    *setmePeersSendingToUs     = 0;
1750    *setmeWebseedsSendingToUs  = 0;
1751
1752    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1753        setmePeersFrom[i] = 0;
1754
1755    for( i=0; i<size; ++i )
1756    {
1757        const tr_peer * peer = peers[i];
1758        const struct peer_atom * atom = peer->atom;
1759
1760        if( peer->io == NULL ) /* not connected */
1761            continue;
1762
1763        ++*setmePeersConnected;
1764
1765        ++setmePeersFrom[atom->from];
1766
1767        if( clientIsDownloadingFrom( peer ) )
1768            ++*setmePeersSendingToUs;
1769
1770        if( clientIsUploadingTo( peer ) )
1771            ++*setmePeersGettingFromUs;
1772
1773        if( atom->flags & ADDED_F_SEED_FLAG )
1774            ++*setmeSeedsConnected;
1775    }
1776
1777    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1778    size = tr_ptrArraySize( &t->webseeds );
1779    for( i=0; i<size; ++i )
1780        if( tr_webseedIsActive( webseeds[i] ) )
1781            ++*setmeWebseedsSendingToUs;
1782
1783    managerUnlock( t->manager );
1784}
1785
1786float
1787tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1788{
1789    int i;
1790    float tmp;
1791    float ret = 0;
1792
1793    const Torrent * t = tor->torrentPeers;
1794    const int n = tr_ptrArraySize( &t->webseeds );
1795    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1796
1797    for( i=0; i<n; ++i )
1798        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1799            ret += tmp;
1800
1801    return ret;
1802}
1803
1804
1805float*
1806tr_peerMgrWebSpeeds( const tr_torrent * tor )
1807{
1808    const Torrent * t = tor->torrentPeers;
1809    const tr_webseed ** webseeds;
1810    int i;
1811    int webseedCount;
1812    float * ret;
1813    uint64_t now;
1814
1815    assert( t->manager );
1816    managerLock( t->manager );
1817
1818    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1819    webseedCount = tr_ptrArraySize( &t->webseeds );
1820    assert( webseedCount == tor->info.webseedCount );
1821    ret = tr_new0( float, webseedCount );
1822    now = tr_date( );
1823
1824    for( i=0; i<webseedCount; ++i )
1825        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1826            ret[i] = -1.0;
1827
1828    managerUnlock( t->manager );
1829    return ret;
1830}
1831
1832double
1833tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1834{
1835    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1836}
1837
1838
1839struct tr_peer_stat *
1840tr_peerMgrPeerStats( const tr_torrent    * tor,
1841                     int                 * setmeCount )
1842{
1843    int i, size;
1844    const Torrent * t = tor->torrentPeers;
1845    const tr_peer ** peers;
1846    tr_peer_stat * ret;
1847    uint64_t now;
1848
1849    assert( t->manager );
1850    managerLock( t->manager );
1851
1852    size = tr_ptrArraySize( &t->peers );
1853    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1854    ret = tr_new0( tr_peer_stat, size );
1855    now = tr_date( );
1856
1857    for( i=0; i<size; ++i )
1858    {
1859        char *                   pch;
1860        const tr_peer *          peer = peers[i];
1861        const struct peer_atom * atom = peer->atom;
1862        tr_peer_stat *           stat = ret + i;
1863
1864        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1865        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1866                   sizeof( stat->client ) );
1867        stat->port               = ntohs( peer->port );
1868        stat->from               = atom->from;
1869        stat->progress           = peer->progress;
1870        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1871        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1872        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1873        stat->peerIsChoked       = peer->peerIsChoked;
1874        stat->peerIsInterested   = peer->peerIsInterested;
1875        stat->clientIsChoked     = peer->clientIsChoked;
1876        stat->clientIsInterested = peer->clientIsInterested;
1877        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1878        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1879        stat->isUploadingTo      = clientIsUploadingTo( peer );
1880        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1881
1882        pch = stat->flagStr;
1883        if( t->optimistic == peer ) *pch++ = 'O';
1884        if( stat->isDownloadingFrom ) *pch++ = 'D';
1885        else if( stat->clientIsInterested ) *pch++ = 'd';
1886        if( stat->isUploadingTo ) *pch++ = 'U';
1887        else if( stat->peerIsInterested ) *pch++ = 'u';
1888        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1889        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1890        if( stat->isEncrypted ) *pch++ = 'E';
1891        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1892        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1893        if( stat->isIncoming ) *pch++ = 'I';
1894        *pch = '\0';
1895    }
1896
1897    *setmeCount = size;
1898
1899    managerUnlock( t->manager );
1900    return ret;
1901}
1902
1903/**
1904***
1905**/
1906
1907struct ChokeData
1908{
1909    tr_bool         doUnchoke;
1910    tr_bool         isInterested;
1911    tr_bool         isChoked;
1912    int             rate;
1913    tr_peer *       peer;
1914};
1915
1916static int
1917compareChoke( const void * va,
1918              const void * vb )
1919{
1920    const struct ChokeData * a = va;
1921    const struct ChokeData * b = vb;
1922
1923    if( a->rate != b->rate ) /* prefer higher overall speeds */
1924        return a->rate > b->rate ? -1 : 1;
1925
1926    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1927        return a->isChoked ? 1 : -1;
1928
1929    return 0;
1930}
1931
1932static int
1933isNew( const tr_peer * peer )
1934{
1935    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1936}
1937
1938static int
1939isSame( const tr_peer * peer )
1940{
1941    return peer && peer->client && strstr( peer->client, "Transmission" );
1942}
1943
1944/**
1945***
1946**/
1947
1948static void
1949rechokeTorrent( Torrent * t, const uint64_t now )
1950{
1951    int i, size, unchokedInterested;
1952    const int peerCount = tr_ptrArraySize( &t->peers );
1953    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1954    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1955    const tr_session * session = t->manager->session;
1956    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1957
1958    assert( torrentIsLocked( t ) );
1959
1960    /* sort the peers by preference and rate */
1961    for( i = 0, size = 0; i < peerCount; ++i )
1962    {
1963        tr_peer * peer = peers[i];
1964        struct peer_atom * atom = peer->atom;
1965
1966        if( peer->progress >= 1.0 ) /* choke all seeds */
1967        {
1968            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1969        }
1970        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1971        {
1972            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1973        }
1974        else if( chokeAll ) /* choke everyone if we're not uploading */
1975        {
1976            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1977        }
1978        else
1979        {
1980            struct ChokeData * n = &choke[size++];
1981            n->peer         = peer;
1982            n->isInterested = peer->peerIsInterested;
1983            n->isChoked     = peer->peerIsChoked;
1984            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1985        }
1986    }
1987
1988    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1989
1990    /**
1991     * Reciprocation and number of uploads capping is managed by unchoking
1992     * the N peers which have the best upload rate and are interested.
1993     * This maximizes the client's download rate. These N peers are
1994     * referred to as downloaders, because they are interested in downloading
1995     * from the client.
1996     *
1997     * Peers which have a better upload rate (as compared to the downloaders)
1998     * but aren't interested get unchoked. If they become interested, the
1999     * downloader with the worst upload rate gets choked. If a client has
2000     * a complete file, it uses its upload rate rather than its download
2001     * rate to decide which peers to unchoke.
2002     */
2003    unchokedInterested = 0;
2004    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2005        choke[i].doUnchoke = 1;
2006        if( choke[i].isInterested )
2007            ++unchokedInterested;
2008    }
2009
2010    /* optimistic unchoke */
2011    if( i < size )
2012    {
2013        int n;
2014        struct ChokeData * c;
2015        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2016
2017        for( ; i<size; ++i )
2018        {
2019            if( choke[i].isInterested )
2020            {
2021                const tr_peer * peer = choke[i].peer;
2022                int x = 1, y;
2023                if( isNew( peer ) ) x *= 3;
2024                if( isSame( peer ) ) x *= 3;
2025                for( y=0; y<x; ++y )
2026                    tr_ptrArrayAppend( &randPool, &choke[i] );
2027            }
2028        }
2029
2030        if(( n = tr_ptrArraySize( &randPool )))
2031        {
2032            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2033            c->doUnchoke = 1;
2034            t->optimistic = c->peer;
2035        }
2036
2037        tr_ptrArrayDestruct( &randPool, NULL );
2038    }
2039
2040    for( i=0; i<size; ++i )
2041        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2042
2043    /* cleanup */
2044    tr_free( choke );
2045}
2046
2047static int
2048rechokePulse( void * vmgr )
2049{
2050    uint64_t now;
2051    tr_torrent * tor = NULL;
2052    tr_peerMgr * mgr = vmgr;
2053    managerLock( mgr );
2054
2055    now = tr_date( );
2056    while(( tor = tr_torrentNext( mgr->session, tor )))
2057        if( tor->isRunning )
2058            rechokeTorrent( tor->torrentPeers, now );
2059
2060    managerUnlock( mgr );
2061    return TRUE;
2062}
2063
2064/***
2065****
2066****  Life and Death
2067****
2068***/
2069
2070typedef enum
2071{
2072    TR_CAN_KEEP,
2073    TR_CAN_CLOSE,
2074    TR_MUST_CLOSE,
2075}
2076tr_close_type_t;
2077
2078static tr_close_type_t
2079shouldPeerBeClosed( const Torrent    * t,
2080                    const tr_peer    * peer,
2081                    int                peerCount )
2082{
2083    const tr_torrent *       tor = t->tor;
2084    const time_t             now = time( NULL );
2085    const struct peer_atom * atom = peer->atom;
2086
2087    /* if it's marked for purging, close it */
2088    if( peer->doPurge )
2089    {
2090        tordbg( t, "purging peer %s because its doPurge flag is set",
2091                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2092        return TR_MUST_CLOSE;
2093    }
2094
2095    /* if we're seeding and the peer has everything we have,
2096     * and enough time has passed for a pex exchange, then disconnect */
2097    if( tr_torrentIsSeed( tor ) )
2098    {
2099        int peerHasEverything;
2100        if( atom->flags & ADDED_F_SEED_FLAG )
2101            peerHasEverything = TRUE;
2102        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2103            peerHasEverything = FALSE;
2104        else {
2105            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2106            tr_bitfieldDifference( tmp, peer->have );
2107            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2108            tr_bitfieldFree( tmp );
2109        }
2110
2111        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2112        {
2113            tordbg( t, "purging peer %s because we're both seeds",
2114                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2115            return TR_MUST_CLOSE;
2116        }
2117    }
2118
2119    /* disconnect if it's been too long since piece data has been transferred.
2120     * this is on a sliding scale based on number of available peers... */
2121    {
2122        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2123        /* if we have >= relaxIfFewerThan, strictness is 100%.
2124         * if we have zero connections, strictness is 0% */
2125        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2126                               ? 1.0
2127                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2128        const int lo = MIN_UPLOAD_IDLE_SECS;
2129        const int hi = MAX_UPLOAD_IDLE_SECS;
2130        const int limit = hi - ( ( hi - lo ) * strictness );
2131        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2132/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2133        if( idleTime > limit ) {
2134            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2135                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2136            return TR_CAN_CLOSE;
2137        }
2138    }
2139
2140    return TR_CAN_KEEP;
2141}
2142
2143static tr_peer **
2144getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2145{
2146    int i, peerCount, outsize;
2147    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2148    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2149
2150    assert( torrentIsLocked( t ) );
2151
2152    for( i = outsize = 0; i < peerCount; ++i )
2153        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2154            ret[outsize++] = peers[i];
2155
2156    *setmeSize = outsize;
2157    return ret;
2158}
2159
2160static int
2161compareCandidates( const void * va, const void * vb )
2162{
2163    const struct peer_atom * a = *(const struct peer_atom**) va;
2164    const struct peer_atom * b = *(const struct peer_atom**) vb;
2165
2166    /* <Charles> Here we would probably want to try reconnecting to
2167     * peers that had most recently given us data. Lots of users have
2168     * trouble with resets due to their routers and/or ISPs. This way we
2169     * can quickly recover from an unwanted reset. So we sort
2170     * piece_data_time in descending order.
2171     */
2172
2173    if( a->piece_data_time != b->piece_data_time )
2174        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2175
2176    if( a->numFails != b->numFails )
2177        return a->numFails < b->numFails ? -1 : 1;
2178
2179    if( a->time != b->time )
2180        return a->time < b->time ? -1 : 1;
2181
2182    /* In order to avoid fragmenting the swarm, peers from trackers and
2183     * from the DHT should be preferred to peers from PEX. */
2184    if( a->from != b->from )
2185        return a->from < b->from ? -1 : 1;
2186
2187    return 0;
2188}
2189
2190static int
2191getReconnectIntervalSecs( const struct peer_atom * atom )
2192{
2193    int          sec;
2194    const time_t now = time( NULL );
2195
2196    /* if we were recently connected to this peer and transferring piece
2197     * data, try to reconnect to them sooner rather that later -- we don't
2198     * want network troubles to get in the way of a good peer. */
2199    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2200        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2201
2202    /* don't allow reconnects more often than our minimum */
2203    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2204        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2205
2206    /* otherwise, the interval depends on how many times we've tried
2207     * and failed to connect to the peer */
2208    else switch( atom->numFails ) {
2209        case 0: sec = 0; break;
2210        case 1: sec = 5; break;
2211        case 2: sec = 2 * 60; break;
2212        case 3: sec = 15 * 60; break;
2213        case 4: sec = 30 * 60; break;
2214        case 5: sec = 60 * 60; break;
2215        default: sec = 120 * 60; break;
2216    }
2217
2218    return sec;
2219}
2220
2221static struct peer_atom **
2222getPeerCandidates( Torrent * t, int * setmeSize )
2223{
2224    int                 i, atomCount, retCount;
2225    struct peer_atom ** atoms;
2226    struct peer_atom ** ret;
2227    const time_t        now = time( NULL );
2228    const int           seed = tr_torrentIsSeed( t->tor );
2229
2230    assert( torrentIsLocked( t ) );
2231
2232    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2233    ret = tr_new( struct peer_atom*, atomCount );
2234    for( i = retCount = 0; i < atomCount; ++i )
2235    {
2236        int                interval;
2237        struct peer_atom * atom = atoms[i];
2238
2239        /* peer fed us too much bad data ... we only keep it around
2240         * now to weed it out in case someone sends it to us via pex */
2241        if( atom->myflags & MYFLAG_BANNED )
2242            continue;
2243
2244        /* peer was unconnectable before, so we're not going to keep trying.
2245         * this is needs a separate flag from `banned', since if they try
2246         * to connect to us later, we'll let them in */
2247        if( atom->myflags & MYFLAG_UNREACHABLE )
2248            continue;
2249
2250        /* we don't need two connections to the same peer... */
2251        if( peerIsInUse( t, &atom->addr ) )
2252            continue;
2253
2254        /* no need to connect if we're both seeds... */
2255        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2256                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2257            continue;
2258
2259        /* don't reconnect too often */
2260        interval = getReconnectIntervalSecs( atom );
2261        if( ( now - atom->time ) < interval )
2262        {
2263            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2264                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2265            continue;
2266        }
2267
2268        /* Don't connect to peers in our blocklist */
2269        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2270            continue;
2271
2272        ret[retCount++] = atom;
2273    }
2274
2275    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2276    *setmeSize = retCount;
2277    return ret;
2278}
2279
2280static void
2281closePeer( Torrent * t, tr_peer * peer )
2282{
2283    struct peer_atom * atom;
2284
2285    assert( t != NULL );
2286    assert( peer != NULL );
2287
2288    atom = peer->atom;
2289
2290    /* if we transferred piece data, then they might be good peers,
2291       so reset their `numFails' weight to zero.  otherwise we connected
2292       to them fruitlessly, so mark it as another fail */
2293    if( atom->piece_data_time )
2294        atom->numFails = 0;
2295    else
2296        ++atom->numFails;
2297
2298    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2299    removePeer( t, peer );
2300}
2301
2302static void
2303reconnectTorrent( Torrent * t )
2304{
2305    static time_t prevTime = 0;
2306    static int    newConnectionsThisSecond = 0;
2307    time_t        now;
2308
2309    now = time( NULL );
2310    if( prevTime != now )
2311    {
2312        prevTime = now;
2313        newConnectionsThisSecond = 0;
2314    }
2315
2316    if( !t->isRunning )
2317    {
2318        removeAllPeers( t );
2319    }
2320    else
2321    {
2322        int i;
2323        int canCloseCount;
2324        int mustCloseCount;
2325        int candidateCount;
2326        int maxCandidates;
2327        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2328        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2329        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2330
2331        tordbg( t, "reconnect pulse for [%s]: "
2332                   "%d must-close connections, "
2333                   "%d can-close connections, "
2334                   "%d connection candidates, "
2335                   "%d atoms, "
2336                   "max per pulse is %d",
2337                   t->tor->info.name,
2338                   mustCloseCount,
2339                   canCloseCount,
2340                   candidateCount,
2341                   tr_ptrArraySize( &t->pool ),
2342                   MAX_RECONNECTIONS_PER_PULSE );
2343
2344        /* disconnect the really bad peers */
2345        for( i=0; i<mustCloseCount; ++i )
2346            closePeer( t, mustClose[i] );
2347
2348        /* decide how many peers can we try to add in this pass */
2349        maxCandidates = candidateCount;
2350        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2351        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2352        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2353
2354        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2355        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2356            closePeer( t, canClose[i] );
2357
2358        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2359                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2360                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2361                   candidateCount,
2362                   MAX_RECONNECTIONS_PER_PULSE,
2363                   getPeerCount( t ),
2364                   getMaxPeerCount( t->tor ),
2365                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2366
2367        /* add some new ones */
2368        for( i=0; i<maxCandidates; ++i )
2369        {
2370            tr_peerMgr        * mgr = t->manager;
2371            struct peer_atom  * atom = candidates[i];
2372            tr_peerIo         * io;
2373
2374            tordbg( t, "Starting an OUTGOING connection with %s",
2375                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2376
2377            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2378
2379            if( io == NULL )
2380            {
2381                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2382                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2383                atom->myflags |= MYFLAG_UNREACHABLE;
2384            }
2385            else
2386            {
2387                tr_handshake * handshake = tr_handshakeNew( io,
2388                                                            mgr->session->encryptionMode,
2389                                                            myHandshakeDoneCB,
2390                                                            mgr );
2391
2392                assert( tr_peerIoGetTorrentHash( io ) );
2393
2394                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2395
2396                ++newConnectionsThisSecond;
2397
2398                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2399                                         handshakeCompare );
2400            }
2401
2402            atom->time = time( NULL );
2403        }
2404
2405        /* cleanup */
2406        tr_free( candidates );
2407        tr_free( mustClose );
2408        tr_free( canClose );
2409    }
2410}
2411
2412struct peer_liveliness
2413{
2414    tr_peer * peer;
2415    void * clientData;
2416    time_t pieceDataTime;
2417    time_t time;
2418    int speed;
2419    tr_bool doPurge;
2420};
2421
2422static int
2423comparePeerLiveliness( const void * va, const void * vb )
2424{
2425    const struct peer_liveliness * a = va;
2426    const struct peer_liveliness * b = vb;
2427
2428    if( a->doPurge != b->doPurge )
2429        return a->doPurge ? 1 : -1;
2430
2431    if( a->speed != b->speed ) /* faster goes first */
2432        return a->speed > b->speed ? -1 : 1;
2433
2434    /* the one to give us data more recently goes first */
2435    if( a->pieceDataTime != b->pieceDataTime )
2436        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2437
2438    /* the one we connected to most recently goes first */
2439    if( a->time != b->time )
2440        return a->time > b->time ? -1 : 1;
2441
2442    return 0;
2443}
2444
2445/* FIXME: getPeersToClose() should use this */
2446static void
2447sortPeersByLiveliness( tr_peer ** peers, void** clientData, int n, uint64_t now )
2448{
2449    int i;
2450    struct peer_liveliness *lives, *l;
2451
2452    /* build a sortable array of peer + extra info */
2453    lives = l = tr_new0( struct peer_liveliness, n );
2454    for( i=0; i<n; ++i, ++l ) {
2455        tr_peer * p = peers[i];
2456        l->peer = p;
2457        l->doPurge = p->doPurge;
2458        l->pieceDataTime = p->atom->piece_data_time;
2459        l->time = p->atom->time;
2460        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2461                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2462        if( clientData )
2463            l->clientData = clientData[i];
2464    }
2465
2466    /* sort 'em */
2467    assert( n == ( l - lives ) );
2468    qsort( lives, n, sizeof( struct peer_liveliness ), comparePeerLiveliness );
2469
2470    /* build the peer array */
2471    for( i=0, l=lives; i<n; ++i, ++l ) {
2472        peers[i] = l->peer;
2473        if( clientData )
2474            clientData[i] = l->clientData;
2475    }
2476    assert( n == ( l - lives ) );
2477
2478    /* cleanup */
2479    tr_free( lives );
2480}
2481
2482static void
2483enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2484{
2485    int n = tr_ptrArraySize( &t->peers );
2486    const int max = tr_sessionGetPeerLimitPerTorrent( t->tor->session );
2487    if( n > max )
2488    {
2489        void * base = tr_ptrArrayBase( &t->peers );
2490        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2491        sortPeersByLiveliness( peers, NULL, n, now );
2492        while( n > max )
2493            closePeer( t, peers[--n] );
2494        tr_free( peers );
2495    }
2496}
2497
2498static void
2499enforceSessionPeerLimit( tr_session * session, uint64_t now )
2500{
2501    int n = 0;
2502    tr_torrent * tor = NULL;
2503    const int max = tr_sessionGetPeerLimit( session );
2504
2505    /* count the total number of peers */
2506    while(( tor = tr_torrentNext( session, tor )))
2507        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2508
2509    /* if there are too many, prune out the worst */ 
2510    if( n > max )
2511    {
2512        tr_peer ** peers = tr_new( tr_peer*, n );
2513        Torrent ** torrents = tr_new( Torrent*, n );
2514
2515        /* populate the peer array */
2516        n = 0;
2517        tor = NULL;
2518        while(( tor = tr_torrentNext( session, tor ))) {
2519            int i;
2520            Torrent * t = tor->torrentPeers;
2521            const int tn = tr_ptrArraySize( &t->peers );
2522            for( i=0; i<tn; ++i, ++n ) {
2523                peers[n] = tr_ptrArrayNth( &t->peers, i );
2524                torrents[n] = t;
2525            }
2526        }
2527       
2528        /* sort 'em */
2529        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2530
2531        /* cull out the crappiest */
2532        while( n-- > max )
2533            closePeer( torrents[n], peers[n] );
2534
2535        /* cleanup */
2536        tr_free( torrents );
2537        tr_free( peers );
2538    }
2539}
2540
2541
2542static int
2543reconnectPulse( void * vmgr )
2544{
2545    tr_torrent * tor;
2546    tr_peerMgr * mgr = vmgr;
2547    uint64_t now;
2548    managerLock( mgr );
2549
2550    now = tr_date( );
2551
2552    /* if we're over the per-torrent peer limits, cull some peers */
2553    tor = NULL;
2554    while(( tor = tr_torrentNext( mgr->session, tor )))
2555        if( tor->isRunning )
2556            enforceTorrentPeerLimit( tor->torrentPeers, now );
2557
2558    /* if we're over the per-session peer limits, cull some peers */
2559    enforceSessionPeerLimit( mgr->session, now );
2560
2561    tor = NULL;
2562    while(( tor = tr_torrentNext( mgr->session, tor )))
2563        if( tor->isRunning )
2564            reconnectTorrent( tor->torrentPeers );
2565
2566    managerUnlock( mgr );
2567    return TRUE;
2568}
2569
2570/****
2571*****
2572*****  BANDWIDTH ALLOCATION
2573*****
2574****/
2575
2576static void
2577pumpAllPeers( tr_peerMgr * mgr )
2578{
2579    tr_torrent * tor = NULL;
2580
2581    while(( tor = tr_torrentNext( mgr->session, tor )))
2582    {
2583        int j;
2584        Torrent * t = tor->torrentPeers;
2585
2586        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2587        {
2588            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2589            tr_peerMsgsPulse( peer->msgs );
2590        }
2591    }
2592}
2593
2594static int
2595bandwidthPulse( void * vmgr )
2596{
2597    tr_torrent * tor = NULL;
2598    tr_peerMgr * mgr = vmgr;
2599    managerLock( mgr );
2600
2601    /* FIXME: this next line probably isn't necessary... */
2602    pumpAllPeers( mgr );
2603
2604    /* allocate bandwidth to the peers */
2605    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2606    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2607
2608    /* possibly stop torrents that have seeded enough */
2609    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2610        if( tor->needsSeedRatioCheck ) {
2611            tor->needsSeedRatioCheck = FALSE;
2612            tr_torrentCheckSeedRatio( tor );
2613        }
2614    }
2615
2616    managerUnlock( mgr );
2617    return TRUE;
2618}
Note: See TracBrowser for help on using the repository browser.