source: trunk/libtransmission/peer-mgr.c @ 9073

Last change on this file since 9073 was 9073, checked in by charles, 13 years ago

delete the obsolete zsh bindings

  • Property svn:keywords set to Date Rev Author Id
File size: 73.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9073 2009-09-08 20:01:12Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "fdlimit.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 400,
50
51    /* how frequently to reallocate bandwidth */
52    BANDWIDTH_PERIOD_MSEC = 500,
53
54    /* how frequently to age out old piece request lists */
55    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = 500,
59
60    /* when many peers are available, keep idle ones this long */
61    MIN_UPLOAD_IDLE_SECS = ( 30 ),
62
63    /* when few peers are available, keep idle ones this long */
64    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
65
66    /* max # of peers to ask fer per torrent per reconnect pulse */
67    MAX_RECONNECTIONS_PER_PULSE = 16,
68
69    /* max number of peers to ask for per second overall.
70    * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 32,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* amount of time to keep a list of request pieces lying around
77       before it's considered too old and needs to be rebuilt */
78    PIECE_LIST_SHELF_LIFE_SECS = 60,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    MYFLAG_BANNED = 1,
82
83    /* use for bitwise operations w/peer_atom.myflags */
84    /* unreachable for now... but not banned.
85     * if they try to connect to us it's okay */
86    MYFLAG_UNREACHABLE = 2,
87
88    /* the minimum we'll wait before attempting to reconnect to a peer */
89    MINIMUM_RECONNECT_INTERVAL_SECS = 5
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    uint8_t     from;
116    uint8_t     flags;       /* these match the added_f flags */
117    uint8_t     myflags;     /* flags that aren't defined in added_f */
118    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
119    tr_port     port;
120    uint16_t    numFails;
121    tr_address  addr;
122    time_t      time;        /* when the peer's connection status last changed */
123    time_t      piece_data_time;
124};
125
126struct tr_blockIterator
127{
128    time_t expirationDate;
129    struct tr_torrent_peers * t;
130    tr_block_index_t blockIndex, blockCount, *blocks;
131    tr_piece_index_t pieceIndex, pieceCount, *pieces;
132};
133
134typedef struct tr_torrent_peers
135{
136    struct event               refillTimer;
137
138    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
139    tr_ptrArray                pool; /* struct peer_atom */
140    tr_ptrArray                peers; /* tr_peer */
141    tr_ptrArray                webseeds; /* tr_webseed */
142
143    tr_torrent               * tor;
144    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
145    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
146    struct tr_peerMgr        * manager;
147    int                      * pendingRequestCount;
148
149    tr_bool                    isRunning;
150}
151Torrent;
152
153struct tr_peerMgr
154{
155    tr_session      * session;
156    tr_ptrArray       incomingHandshakes; /* tr_handshake */
157    tr_timer        * bandwidthTimer;
158    tr_timer        * rechokeTimer;
159    tr_timer        * reconnectTimer;
160    tr_timer        * refillUpkeepTimer;
161};
162
163#define tordbg( t, ... ) \
164    do { \
165        if( tr_deepLoggingIsActive( ) ) \
166            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
167    } while( 0 )
168
169#define dbgmsg( ... ) \
170    do { \
171        if( tr_deepLoggingIsActive( ) ) \
172            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
173    } while( 0 )
174
175/**
176***
177**/
178
179static TR_INLINE void
180managerLock( const struct tr_peerMgr * manager )
181{
182    tr_globalLock( manager->session );
183}
184
185static TR_INLINE void
186managerUnlock( const struct tr_peerMgr * manager )
187{
188    tr_globalUnlock( manager->session );
189}
190
191static TR_INLINE void
192torrentLock( Torrent * torrent )
193{
194    managerLock( torrent->manager );
195}
196
197static TR_INLINE void
198torrentUnlock( Torrent * torrent )
199{
200    managerUnlock( torrent->manager );
201}
202
203static TR_INLINE int
204torrentIsLocked( const Torrent * t )
205{
206    return tr_globalIsLocked( t->manager->session );
207}
208
209/**
210***
211**/
212
213static int
214handshakeCompareToAddr( const void * va, const void * vb )
215{
216    const tr_handshake * a = va;
217
218    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
219}
220
221static int
222handshakeCompare( const void * a, const void * b )
223{
224    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
225}
226
227static tr_handshake*
228getExistingHandshake( tr_ptrArray      * handshakes,
229                      const tr_address * addr )
230{
231    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
232}
233
234static int
235comparePeerAtomToAddress( const void * va, const void * vb )
236{
237    const struct peer_atom * a = va;
238
239    return tr_compareAddresses( &a->addr, vb );
240}
241
242static int
243comparePeerAtoms( const void * va, const void * vb )
244{
245    const struct peer_atom * b = vb;
246
247    return comparePeerAtomToAddress( va, &b->addr );
248}
249
250/**
251***
252**/
253
254static Torrent*
255getExistingTorrent( tr_peerMgr *    manager,
256                    const uint8_t * hash )
257{
258    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
259
260    return tor == NULL ? NULL : tor->torrentPeers;
261}
262
263static int
264peerCompare( const void * va, const void * vb )
265{
266    const tr_peer * a = va;
267    const tr_peer * b = vb;
268
269    return tr_compareAddresses( &a->addr, &b->addr );
270}
271
272static int
273peerCompareToAddr( const void * va, const void * vb )
274{
275    const tr_peer * a = va;
276
277    return tr_compareAddresses( &a->addr, vb );
278}
279
280static tr_peer*
281getExistingPeer( Torrent          * torrent,
282                 const tr_address * addr )
283{
284    assert( torrentIsLocked( torrent ) );
285    assert( addr );
286
287    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
288}
289
290static struct peer_atom*
291getExistingAtom( const Torrent    * t,
292                 const tr_address * addr )
293{
294    Torrent * tt = (Torrent*)t;
295    assert( torrentIsLocked( t ) );
296    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
297}
298
299static tr_bool
300peerIsInUse( const Torrent    * ct,
301             const tr_address * addr )
302{
303    Torrent * t = (Torrent*) ct;
304
305    assert( torrentIsLocked ( t ) );
306
307    return getExistingPeer( t, addr )
308        || getExistingHandshake( &t->outgoingHandshakes, addr )
309        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
310}
311
312static tr_peer*
313peerConstructor( const tr_address * addr )
314{
315    tr_peer * p;
316    p = tr_new0( tr_peer, 1 );
317    p->addr = *addr;
318    return p;
319}
320
321static tr_peer*
322getPeer( Torrent          * torrent,
323         const tr_address * addr )
324{
325    tr_peer * peer;
326
327    assert( torrentIsLocked( torrent ) );
328
329    peer = getExistingPeer( torrent, addr );
330
331    if( peer == NULL )
332    {
333        peer = peerConstructor( addr );
334        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
335    }
336
337    return peer;
338}
339
340static void
341peerDestructor( tr_peer * peer )
342{
343    assert( peer );
344
345    if( peer->msgs != NULL )
346    {
347        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
348        tr_peerMsgsFree( peer->msgs );
349    }
350
351    tr_peerIoClear( peer->io );
352    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
353
354    tr_bitfieldFree( peer->have );
355    tr_bitfieldFree( peer->blame );
356    tr_free( peer->client );
357
358    tr_free( peer );
359}
360
361static void
362removePeer( Torrent * t, tr_peer * peer )
363{
364    tr_peer * removed;
365    struct peer_atom * atom = peer->atom;
366
367    assert( torrentIsLocked( t ) );
368    assert( atom );
369
370    atom->time = time( NULL );
371
372    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
373    assert( removed == peer );
374    peerDestructor( removed );
375}
376
377static void
378removeAllPeers( Torrent * t )
379{
380    while( !tr_ptrArrayEmpty( &t->peers ) )
381        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
382}
383
384static void blockIteratorFree( struct tr_blockIterator ** inout );
385
386static void
387torrentDestructor( void * vt )
388{
389    Torrent * t = vt;
390
391    assert( t );
392    assert( !t->isRunning );
393    assert( torrentIsLocked( t ) );
394    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
395    assert( tr_ptrArrayEmpty( &t->peers ) );
396
397    evtimer_del( &t->refillTimer );
398
399    blockIteratorFree( &t->refillQueue );
400    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
401    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
402    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
403    tr_ptrArrayDestruct( &t->peers, NULL );
404
405    tr_free( t->pendingRequestCount );
406    tr_free( t );
407}
408
409
410static void refillPulse( int, short, void* );
411
412static void peerCallbackFunc( void * vpeer,
413                              void * vevent,
414                              void * vt );
415
416static Torrent*
417torrentConstructor( tr_peerMgr * manager,
418                    tr_torrent * tor )
419{
420    int       i;
421    Torrent * t;
422
423    t = tr_new0( Torrent, 1 );
424    t->manager = manager;
425    t->tor = tor;
426    t->pool = TR_PTR_ARRAY_INIT;
427    t->peers = TR_PTR_ARRAY_INIT;
428    t->webseeds = TR_PTR_ARRAY_INIT;
429    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
430    evtimer_set( &t->refillTimer, refillPulse, t );
431
432
433    for( i = 0; i < tor->info.webseedCount; ++i )
434    {
435        tr_webseed * w =
436            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
437        tr_ptrArrayAppend( &t->webseeds, w );
438    }
439
440    return t;
441}
442
443
444static int bandwidthPulse ( void * vmgr );
445static int rechokePulse   ( void * vmgr );
446static int reconnectPulse ( void * vmgr );
447static int refillUpkeep   ( void * vmgr );
448
449tr_peerMgr*
450tr_peerMgrNew( tr_session * session )
451{
452    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
453    m->session = session;
454    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
455fprintf( stderr, "sizeof(tr_pex) is %d\n", (int)sizeof(tr_pex) );
456    return m;
457}
458
459static void
460deleteTimers( struct tr_peerMgr * m )
461{
462    if( m->bandwidthTimer )
463        tr_timerFree( &m->bandwidthTimer );
464
465    if( m->rechokeTimer )
466        tr_timerFree( &m->rechokeTimer );
467
468    if( m->reconnectTimer )
469        tr_timerFree( &m->reconnectTimer );
470
471    if( m->refillUpkeepTimer )
472        tr_timerFree( &m->refillUpkeepTimer );
473}
474
475void
476tr_peerMgrFree( tr_peerMgr * manager )
477{
478    managerLock( manager );
479
480    deleteTimers( manager );
481
482    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
483     * the item from manager->handshakes, so this is a little roundabout... */
484    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
485        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
486
487    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
488
489    managerUnlock( manager );
490    tr_free( manager );
491}
492
493static int
494clientIsDownloadingFrom( const tr_peer * peer )
495{
496    return peer->clientIsInterested && !peer->clientIsChoked;
497}
498
499static int
500clientIsUploadingTo( const tr_peer * peer )
501{
502    return peer->peerIsInterested && !peer->peerIsChoked;
503}
504
505/***
506****
507***/
508
509tr_bool
510tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
511                      const tr_address  * addr )
512{
513    tr_bool isSeed = FALSE;
514    const Torrent * t = tor->torrentPeers;
515    const struct peer_atom * atom = getExistingAtom( t, addr );
516
517    if( atom )
518        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
519
520    return isSeed;
521}
522
523/****
524*****
525*****  REFILL
526*****
527****/
528
529static void
530assertValidPiece( Torrent * t, tr_piece_index_t piece )
531{
532    assert( t );
533    assert( t->tor );
534    assert( piece < t->tor->info.pieceCount );
535}
536
537static int
538getPieceRequests( Torrent * t, tr_piece_index_t piece )
539{
540    assertValidPiece( t, piece );
541
542    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
543}
544
545static void
546incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
547{
548    assertValidPiece( t, piece );
549
550    if( t->pendingRequestCount == NULL )
551        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
552    t->pendingRequestCount[piece]++;
553}
554
555static void
556decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
557{
558    assertValidPiece( t, piece );
559
560    if( t->pendingRequestCount )
561        t->pendingRequestCount[piece]--;
562}
563
564struct tr_refill_piece
565{
566    tr_priority_t    priority;
567    uint32_t         piece;
568    uint32_t         peerCount;
569    int              random;
570    int              pendingRequestCount;
571    int              missingBlockCount;
572};
573
574static int
575compareRefillPiece( const void * aIn, const void * bIn )
576{
577    const struct tr_refill_piece * a = aIn;
578    const struct tr_refill_piece * b = bIn;
579
580    /* if one piece has a higher priority, it goes first */
581    if( a->priority != b->priority )
582        return a->priority > b->priority ? -1 : 1;
583
584    /* have a per-priority endgame */
585    if( a->pendingRequestCount != b->pendingRequestCount )
586        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
587
588    /* fewer missing pieces goes first */
589    if( a->missingBlockCount != b->missingBlockCount )
590        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
591
592    /* otherwise if one has fewer peers, it goes first */
593    if( a->peerCount != b->peerCount )
594        return a->peerCount < b->peerCount ? -1 : 1;
595
596    /* otherwise go with our random seed */
597    if( a->random != b->random )
598        return a->random < b->random ? -1 : 1;
599
600    return 0;
601}
602
603static tr_piece_index_t *
604getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
605{
606    const tr_torrent  * tor = t->tor;
607    const tr_info     * inf = &tor->info;
608    tr_piece_index_t    i;
609    tr_piece_index_t    poolSize = 0;
610    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
611    int                 peerCount;
612    const tr_peer    ** peers;
613
614    assert( torrentIsLocked( t ) );
615
616    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
617    peerCount = tr_ptrArraySize( &t->peers );
618
619    /* make a list of the pieces that we want but don't have */
620    for( i = 0; i < inf->pieceCount; ++i )
621        if( !tor->info.pieces[i].dnd
622                && !tr_cpPieceIsComplete( &tor->completion, i ) )
623            pool[poolSize++] = i;
624
625    /* sort the pool by which to request next */
626    if( poolSize > 1 )
627    {
628        tr_piece_index_t j;
629        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
630
631        for( j = 0; j < poolSize; ++j )
632        {
633            int k;
634            const tr_piece_index_t piece = pool[j];
635            struct tr_refill_piece * setme = p + j;
636
637            setme->piece = piece;
638            setme->priority = inf->pieces[piece].priority;
639            setme->peerCount = 0;
640            setme->random = tr_cryptoWeakRandInt( INT_MAX );
641            setme->pendingRequestCount = getPieceRequests( t, piece );
642            setme->missingBlockCount
643                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
644
645            for( k = 0; k < peerCount; ++k )
646            {
647                const tr_peer * peer = peers[k];
648                if( peer->peerIsInterested
649                        && !peer->clientIsChoked
650                        && tr_bitfieldHas( peer->have, piece ) )
651                    ++setme->peerCount;
652            }
653        }
654
655        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
656               compareRefillPiece );
657
658        for( j = 0; j < poolSize; ++j )
659            pool[j] = p[j].piece;
660
661        tr_free( p );
662    }
663
664    *pieceCount = poolSize;
665    return pool;
666}
667
668static struct tr_blockIterator*
669blockIteratorNew( Torrent * t )
670{
671    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
672    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
673    i->t = t;
674    i->pieces = getPreferredPieces( t, &i->pieceCount );
675    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
676    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
677    return i;
678}
679
680static tr_bool
681blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
682{
683    tr_bool found;
684    Torrent * t = i->t;
685    tr_torrent * tor = t->tor;
686
687    while( ( i->blockIndex == i->blockCount )
688        && ( i->pieceIndex < i->pieceCount ) )
689    {
690        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
691        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
692        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
693        tr_block_index_t block;
694
695        assert( index < tor->info.pieceCount );
696
697        i->blockCount = 0;
698        i->blockIndex = 0;
699        for( block=b; block!=e; ++block )
700            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
701                i->blocks[i->blockCount++] = block;
702    }
703
704    assert( i->blockCount <= tor->blockCountInPiece );
705
706    if(( found = ( i->blockIndex < i->blockCount )))
707        *setme = i->blocks[i->blockIndex++];
708
709    return found;
710}
711
712static void
713blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
714{
715    i->blockIndex = i->blockCount;
716}
717
718static void
719blockIteratorFree( struct tr_blockIterator ** inout )
720{
721    struct tr_blockIterator * it = *inout;
722
723    if( it != NULL )
724    {
725        tr_free( it->blocks );
726        tr_free( it->pieces );
727        tr_free( it );
728    }
729
730    *inout = NULL;
731}
732
733static tr_peer**
734getPeersUploadingToClient( Torrent * t,
735                           int *     setmeCount )
736{
737    int j;
738    int peerCount = 0;
739    int retCount = 0;
740    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
741    tr_peer ** ret = tr_new( tr_peer *, peerCount );
742
743    j = 0; /* this is a temporary test to make sure we walk through all the peers */
744    if( peerCount )
745    {
746        /* Get a list of peers we're downloading from.
747           Pick a different starting point each time so all peers
748           get a chance at being the first in line */
749        const int fencepost = tr_cryptoWeakRandInt( peerCount );
750        int i = fencepost;
751        do {
752            if( clientIsDownloadingFrom( peers[i] ) )
753                ret[retCount++] = peers[i];
754            i = ( i + 1 ) % peerCount;
755            ++j;
756        } while( i != fencepost );
757    }
758    assert( j == peerCount );
759    *setmeCount = retCount;
760    return ret;
761}
762
763static uint32_t
764getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
765{
766    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
767    const uint64_t blockPos = tor->blockSize * b;
768    assert( blockPos >= piecePos );
769    return (uint32_t)( blockPos - piecePos );
770}
771
772static int
773refillUpkeep( void * vmgr )
774{
775    tr_torrent * tor = NULL;
776    tr_peerMgr * mgr = vmgr;
777    time_t now;
778    managerLock( mgr );
779
780    now = time( NULL );
781    while(( tor = tr_torrentNext( mgr->session, tor ))) {
782        Torrent * t = tor->torrentPeers;
783        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
784            tordbg( t, "refill queue is past its shelf date; discarding." );
785            blockIteratorFree( &t->refillQueue );
786        }
787    }
788
789    managerUnlock( mgr );
790    return TRUE;
791}
792
793static void
794refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
795{
796    tr_block_index_t block;
797    int peerCount;
798    int webseedCount;
799    tr_peer ** peers;
800    tr_webseed ** webseeds;
801    Torrent * t = vtorrent;
802    tr_torrent * tor = t->tor;
803    tr_bool hasNext = TRUE;
804
805    if( !t->isRunning )
806        return;
807    if( tr_torrentIsSeed( t->tor ) )
808        return;
809
810    torrentLock( t );
811    tordbg( t, "Refilling Request Buffers..." );
812
813    if( t->refillQueue == NULL )
814        t->refillQueue = blockIteratorNew( t );
815
816    peers = getPeersUploadingToClient( t, &peerCount );
817    webseedCount = tr_ptrArraySize( &t->webseeds );
818    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
819                          webseedCount * sizeof( tr_webseed* ) );
820
821    while( ( webseedCount || peerCount )
822        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
823    {
824        int j;
825        tr_bool handled = FALSE;
826
827        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
828        const uint32_t offset = getBlockOffsetInPiece( tor, block );
829        const uint32_t length = tr_torBlockCountBytes( tor, block );
830
831        assert( block < tor->blockCount );
832
833        /* find a peer who can ask for this block */
834        for( j=0; !handled && j<peerCount; )
835        {
836            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
837            switch( val )
838            {
839                case TR_ADDREQ_FULL:
840                case TR_ADDREQ_CLIENT_CHOKED:
841                    peers[j] = peers[--peerCount];
842                    break;
843
844                case TR_ADDREQ_MISSING:
845                case TR_ADDREQ_DUPLICATE:
846                    ++j;
847                    break;
848
849                case TR_ADDREQ_OK:
850                    incrementPieceRequests( t, index );
851                    handled = TRUE;
852                    break;
853
854                default:
855                    assert( 0 && "unhandled value" );
856                    break;
857            }
858        }
859
860        /* maybe one of the webseeds can do it */
861        for( j=0; !handled && j<webseedCount; )
862        {
863            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
864            switch( val )
865            {
866                case TR_ADDREQ_FULL:
867                    webseeds[j] = webseeds[--webseedCount];
868                    break;
869
870                case TR_ADDREQ_OK:
871                    incrementPieceRequests( t, index );
872                    handled = TRUE;
873                    break;
874
875                default:
876                    assert( 0 && "unhandled value" );
877                    break;
878            }
879        }
880
881        if( !handled )
882            blockIteratorSkipCurrentPiece( t->refillQueue );
883    }
884
885    /* cleanup */
886    tr_free( webseeds );
887    tr_free( peers );
888
889    if( !hasNext ) {
890        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
891        blockIteratorFree( &t->refillQueue );
892    }
893
894    torrentUnlock( t );
895}
896
897static void
898broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
899{
900    size_t i;
901    size_t peerCount;
902    tr_peer ** peers;
903
904    assert( torrentIsLocked( t ) );
905
906    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
907
908    peerCount = tr_ptrArraySize( &t->peers );
909    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
910    for( i=0; i<peerCount; ++i )
911        if( peers[i]->msgs )
912            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
913}
914
915static void
916addStrike( Torrent * t, tr_peer * peer )
917{
918    tordbg( t, "increasing peer %s strike count to %d",
919            tr_peerIoAddrStr( &peer->addr,
920                              peer->port ), peer->strikes + 1 );
921
922    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
923    {
924        struct peer_atom * atom = peer->atom;
925        atom->myflags |= MYFLAG_BANNED;
926        peer->doPurge = 1;
927        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
928    }
929}
930
931static void
932gotBadPiece( Torrent *        t,
933             tr_piece_index_t pieceIndex )
934{
935    tr_torrent *   tor = t->tor;
936    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
937
938    tor->corruptCur += byteCount;
939    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
940}
941
942static void
943refillSoon( Torrent * t )
944{
945    if( !evtimer_pending( &t->refillTimer, NULL ) )
946        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
947}
948
949static void
950peerSuggestedPiece( Torrent            * t UNUSED,
951                    tr_peer            * peer UNUSED,
952                    tr_piece_index_t     pieceIndex UNUSED,
953                    int                  isFastAllowed UNUSED )
954{
955#if 0
956    assert( t );
957    assert( peer );
958    assert( peer->msgs );
959
960    /* is this a valid piece? */
961    if(  pieceIndex >= t->tor->info.pieceCount )
962        return;
963
964    /* don't ask for it if we've already got it */
965    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
966        return;
967
968    /* don't ask for it if they don't have it */
969    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
970        return;
971
972    /* don't ask for it if we're choked and it's not fast */
973    if( !isFastAllowed && peer->clientIsChoked )
974        return;
975
976    /* request the blocks that we don't have in this piece */
977    {
978        tr_block_index_t block;
979        const tr_torrent * tor = t->tor;
980        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
981        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
982
983        for( block=start; block<end; ++block )
984        {
985            if( !tr_cpBlockIsComplete( tor->completion, block ) )
986            {
987                const uint32_t offset = getBlockOffsetInPiece( tor, block );
988                const uint32_t length = tr_torBlockCountBytes( tor, block );
989                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
990                incrementPieceRequests( t, pieceIndex );
991            }
992        }
993    }
994#endif
995}
996
997static void
998peerCallbackFunc( void * vpeer, void * vevent, void * vt )
999{
1000    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1001    Torrent * t = vt;
1002    const tr_peer_event * e = vevent;
1003
1004    torrentLock( t );
1005
1006    switch( e->eventType )
1007    {
1008        case TR_PEER_UPLOAD_ONLY:
1009            /* update our atom */
1010            if( peer ) {
1011                if( e->uploadOnly ) {
1012                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1013                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1014                } else {
1015                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1016                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1017                }
1018            }
1019            break;
1020
1021        case TR_PEER_NEED_REQ:
1022            refillSoon( t );
1023            break;
1024
1025        case TR_PEER_CANCEL:
1026            decrementPieceRequests( t, e->pieceIndex );
1027            break;
1028
1029        case TR_PEER_PEER_GOT_DATA:
1030        {
1031            const time_t now = time( NULL );
1032            tr_torrent * tor = t->tor;
1033
1034            tr_torrentSetActivityDate( tor, now );
1035
1036            if( e->wasPieceData ) {
1037                tor->uploadedCur += e->length;
1038                tr_torrentSetDirty( tor );
1039            }
1040
1041            /* update the stats */
1042            if( e->wasPieceData )
1043                tr_statsAddUploaded( tor->session, e->length );
1044
1045            /* update our atom */
1046            if( peer && e->wasPieceData )
1047                peer->atom->piece_data_time = now;
1048
1049            tor->needsSeedRatioCheck = TRUE;
1050
1051            break;
1052        }
1053
1054        case TR_PEER_CLIENT_GOT_SUGGEST:
1055            if( peer )
1056                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1057            break;
1058
1059        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1060            if( peer )
1061                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1062            break;
1063
1064        case TR_PEER_CLIENT_GOT_DATA:
1065        {
1066            const time_t now = time( NULL );
1067            tr_torrent * tor = t->tor;
1068
1069            tr_torrentSetActivityDate( tor, now );
1070
1071            /* only add this to downloadedCur if we got it from a peer --
1072             * webseeds shouldn't count against our ratio.  As one tracker
1073             * admin put it, "Those pieces are downloaded directly from the
1074             * content distributor, not the peers, it is the tracker's job
1075             * to manage the swarms, not the web server and does not fit
1076             * into the jurisdiction of the tracker." */
1077            if( peer && e->wasPieceData ) {
1078                tor->downloadedCur += e->length;
1079                tr_torrentSetDirty( tor );
1080            }
1081
1082            /* update the stats */
1083            if( e->wasPieceData )
1084                tr_statsAddDownloaded( tor->session, e->length );
1085
1086            /* update our atom */
1087            if( peer && e->wasPieceData )
1088                peer->atom->piece_data_time = now;
1089
1090            break;
1091        }
1092
1093        case TR_PEER_PEER_PROGRESS:
1094        {
1095            if( peer )
1096            {
1097                struct peer_atom * atom = peer->atom;
1098                if( e->progress >= 1.0 ) {
1099                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1100                    atom->flags |= ADDED_F_SEED_FLAG;
1101                }
1102            }
1103            break;
1104        }
1105
1106        case TR_PEER_CLIENT_GOT_BLOCK:
1107        {
1108            tr_torrent * tor = t->tor;
1109
1110            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1111
1112            tr_cpBlockAdd( &tor->completion, block );
1113            tr_torrentSetDirty( tor );
1114            decrementPieceRequests( t, e->pieceIndex );
1115
1116            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1117
1118            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1119            {
1120                const tr_piece_index_t p = e->pieceIndex;
1121                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1122
1123                if( !ok )
1124                {
1125                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1126                               (unsigned long)p );
1127                }
1128
1129                tr_torrentSetHasPiece( tor, p, ok );
1130                tr_torrentSetPieceChecked( tor, p, TRUE );
1131                tr_peerMgrSetBlame( tor, p, ok );
1132
1133                if( !ok )
1134                {
1135                    gotBadPiece( t, p );
1136                }
1137                else
1138                {
1139                    int i;
1140                    int peerCount;
1141                    tr_peer ** peers;
1142                    tr_file_index_t fileIndex;
1143
1144                    peerCount = tr_ptrArraySize( &t->peers );
1145                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1146                    for( i=0; i<peerCount; ++i )
1147                        tr_peerMsgsHave( peers[i]->msgs, p );
1148
1149                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1150                    {
1151                        const tr_file * file = &tor->info.files[fileIndex];
1152                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1153                        {
1154                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1155                            tordbg( t, "closing recently-completed file \"%s\"", path );
1156                            tr_fdFileClose( path );
1157                            tr_free( path );
1158                        }
1159                    }
1160                }
1161
1162                tr_torrentRecheckCompleteness( tor );
1163            }
1164            break;
1165        }
1166
1167        case TR_PEER_ERROR:
1168            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1169            {
1170                /* some protocol error from the peer */
1171                peer->doPurge = 1;
1172                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1173                        tr_peerIoAddrStr( &peer->addr, peer->port ) );
1174            }
1175            else 
1176            {
1177                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1178            }
1179            break;
1180
1181        default:
1182            assert( 0 );
1183    }
1184
1185    torrentUnlock( t );
1186}
1187
1188static void
1189ensureAtomExists( Torrent          * t,
1190                  const tr_address * addr,
1191                  tr_port            port,
1192                  uint8_t            flags,
1193                  uint8_t            from )
1194{
1195    if( getExistingAtom( t, addr ) == NULL )
1196    {
1197        struct peer_atom * a;
1198        a = tr_new0( struct peer_atom, 1 );
1199        a->addr = *addr;
1200        a->port = port;
1201        a->flags = flags;
1202        a->from = from;
1203        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1204        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1205    }
1206}
1207
1208static int
1209getMaxPeerCount( const tr_torrent * tor )
1210{
1211    return tor->maxConnectedPeers;
1212}
1213
1214static int
1215getPeerCount( const Torrent * t )
1216{
1217    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1218}
1219
1220/* FIXME: this is kind of a mess. */
1221static tr_bool
1222myHandshakeDoneCB( tr_handshake  * handshake,
1223                   tr_peerIo     * io,
1224                   tr_bool         isConnected,
1225                   const uint8_t * peer_id,
1226                   void          * vmanager )
1227{
1228    tr_bool            ok = isConnected;
1229    tr_bool            success = FALSE;
1230    tr_port            port;
1231    const tr_address * addr;
1232    tr_peerMgr       * manager = vmanager;
1233    Torrent          * t;
1234    tr_handshake     * ours;
1235
1236    assert( io );
1237    assert( tr_isBool( ok ) );
1238
1239    t = tr_peerIoHasTorrentHash( io )
1240        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1241        : NULL;
1242
1243    if( tr_peerIoIsIncoming ( io ) )
1244        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1245                                        handshake, handshakeCompare );
1246    else if( t )
1247        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1248                                        handshake, handshakeCompare );
1249    else
1250        ours = handshake;
1251
1252    assert( ours );
1253    assert( ours == handshake );
1254
1255    if( t )
1256        torrentLock( t );
1257
1258    addr = tr_peerIoGetAddress( io, &port );
1259
1260    if( !ok || !t || !t->isRunning )
1261    {
1262        if( t )
1263        {
1264            struct peer_atom * atom = getExistingAtom( t, addr );
1265            if( atom )
1266                ++atom->numFails;
1267        }
1268    }
1269    else /* looking good */
1270    {
1271        struct peer_atom * atom;
1272        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1273        atom = getExistingAtom( t, addr );
1274        atom->time = time( NULL );
1275        atom->piece_data_time = 0;
1276
1277        if( atom->myflags & MYFLAG_BANNED )
1278        {
1279            tordbg( t, "banned peer %s tried to reconnect",
1280                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1281        }
1282        else if( tr_peerIoIsIncoming( io )
1283               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1284
1285        {
1286        }
1287        else
1288        {
1289            tr_peer * peer = getExistingPeer( t, addr );
1290
1291            if( peer ) /* we already have this peer */
1292            {
1293            }
1294            else
1295            {
1296                peer = getPeer( t, addr );
1297                tr_free( peer->client );
1298
1299                if( !peer_id )
1300                    peer->client = NULL;
1301                else {
1302                    char client[128];
1303                    tr_clientForId( client, sizeof( client ), peer_id );
1304                    peer->client = tr_strdup( client );
1305                }
1306
1307                peer->port = port;
1308                peer->atom = atom;
1309                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1310                                                                balanced by our unref in peerDestructor()  */
1311                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1312                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1313
1314                success = TRUE;
1315            }
1316        }
1317    }
1318
1319    if( t )
1320        torrentUnlock( t );
1321
1322    return success;
1323}
1324
1325void
1326tr_peerMgrAddIncoming( tr_peerMgr * manager,
1327                       tr_address * addr,
1328                       tr_port      port,
1329                       int          socket )
1330{
1331    managerLock( manager );
1332
1333    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1334    {
1335        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1336        tr_netClose( socket );
1337    }
1338    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1339    {
1340        tr_netClose( socket );
1341    }
1342    else /* we don't have a connetion to them yet... */
1343    {
1344        tr_peerIo *    io;
1345        tr_handshake * handshake;
1346
1347        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1348
1349        handshake = tr_handshakeNew( io,
1350                                     manager->session->encryptionMode,
1351                                     myHandshakeDoneCB,
1352                                     manager );
1353
1354        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1355
1356        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1357                                 handshakeCompare );
1358    }
1359
1360    managerUnlock( manager );
1361}
1362
1363static tr_bool
1364tr_isPex( const tr_pex * pex )
1365{
1366    return pex && tr_isAddress( &pex->addr );
1367}
1368
1369void
1370tr_peerMgrAddPex( tr_torrent   *  tor,
1371                  uint8_t         from,
1372                  const tr_pex *  pex )
1373{
1374    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1375    {
1376        Torrent * t = tor->torrentPeers;
1377        managerLock( t->manager );
1378
1379        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1380            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1381                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1382
1383        managerUnlock( t->manager );
1384    }
1385}
1386
1387tr_pex *
1388tr_peerMgrCompactToPex( const void *    compact,
1389                        size_t          compactLen,
1390                        const uint8_t * added_f,
1391                        size_t          added_f_len,
1392                        size_t *        pexCount )
1393{
1394    size_t          i;
1395    size_t          n = compactLen / 6;
1396    const uint8_t * walk = compact;
1397    tr_pex *        pex = tr_new0( tr_pex, n );
1398
1399    for( i = 0; i < n; ++i )
1400    {
1401        pex[i].addr.type = TR_AF_INET;
1402        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1403        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1404        if( added_f && ( n == added_f_len ) )
1405            pex[i].flags = added_f[i];
1406    }
1407
1408    *pexCount = n;
1409    return pex;
1410}
1411
1412tr_pex *
1413tr_peerMgrCompact6ToPex( const void    * compact,
1414                         size_t          compactLen,
1415                         const uint8_t * added_f,
1416                         size_t          added_f_len,
1417                         size_t        * pexCount )
1418{
1419    size_t          i;
1420    size_t          n = compactLen / 18;
1421    const uint8_t * walk = compact;
1422    tr_pex *        pex = tr_new0( tr_pex, n );
1423
1424    for( i = 0; i < n; ++i )
1425    {
1426        pex[i].addr.type = TR_AF_INET6;
1427        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1428        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1429        if( added_f && ( n == added_f_len ) )
1430            pex[i].flags = added_f[i];
1431    }
1432
1433    *pexCount = n;
1434    return pex;
1435}
1436
1437tr_pex *
1438tr_peerMgrArrayToPex( const void * array,
1439                      size_t       arrayLen,
1440                      size_t      * pexCount )
1441{
1442    size_t          i;
1443    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1444    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1445    const uint8_t * walk = array;
1446    tr_pex        * pex = tr_new0( tr_pex, n );
1447
1448    for( i = 0 ; i < n ; i++ ) {
1449        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1450        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1451        pex[i].flags = 0x00;
1452        walk += sizeof( tr_address ) + 2;
1453    }
1454
1455    *pexCount = n;
1456    return pex;
1457}
1458
1459/**
1460***
1461**/
1462
1463void
1464tr_peerMgrSetBlame( tr_torrent     * tor,
1465                    tr_piece_index_t pieceIndex,
1466                    int              success )
1467{
1468    if( !success )
1469    {
1470        int        peerCount, i;
1471        Torrent *  t = tor->torrentPeers;
1472        tr_peer ** peers;
1473
1474        assert( torrentIsLocked( t ) );
1475
1476        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1477        for( i = 0; i < peerCount; ++i )
1478        {
1479            tr_peer * peer = peers[i];
1480            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1481            {
1482                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1483                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1484                        pieceIndex, (int)peer->strikes + 1 );
1485                addStrike( t, peer );
1486            }
1487        }
1488    }
1489}
1490
1491int
1492tr_pexCompare( const void * va, const void * vb )
1493{
1494    const tr_pex * a = va;
1495    const tr_pex * b = vb;
1496    int i;
1497
1498    assert( tr_isPex( a ) );
1499    assert( tr_isPex( b ) );
1500
1501    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1502        return i;
1503
1504    if( a->port != b->port )
1505        return a->port < b->port ? -1 : 1;
1506
1507    return 0;
1508}
1509
1510#if 0
1511static int
1512peerPrefersCrypto( const tr_peer * peer )
1513{
1514    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1515        return TRUE;
1516
1517    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1518        return FALSE;
1519
1520    return tr_peerIoIsEncrypted( peer->io );
1521}
1522#endif
1523
1524int
1525tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af )
1526{
1527    int count = 0;
1528    const Torrent * t = tor->torrentPeers;
1529
1530    managerLock( t->manager );
1531
1532    {
1533        int i;
1534        const struct peer_atom ** atoms = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1535        const int atomCount = tr_ptrArraySize( &t->pool );
1536        /* for now, this will waste memory on torrents that have both
1537         * ipv6 and ipv4 peers */
1538        tr_pex * pex = tr_new0( tr_pex, atomCount );
1539        tr_pex * walk = pex;
1540
1541        for( i=0; i<atomCount; ++i )
1542        {
1543            const struct peer_atom * atom = atoms[i];
1544            if( atom->addr.type == af )
1545            {
1546                assert( tr_isAddress( &atom->addr ) );
1547                walk->addr = atom->addr;
1548                walk->port = atom->port;
1549                walk->flags = atom->flags;
1550                ++count;
1551                ++walk;
1552            }
1553        }
1554
1555        assert( ( walk - pex ) == count );
1556        qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1557        *setme_pex = pex;
1558    }
1559
1560    managerUnlock( t->manager );
1561    return count;
1562}
1563
1564static void
1565ensureMgrTimersExist( struct tr_peerMgr * m )
1566{
1567    tr_session * s = m->session;
1568
1569    if( m->bandwidthTimer == NULL )
1570        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1571
1572    if( m->rechokeTimer == NULL )
1573        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1574
1575    if( m->reconnectTimer == NULL )
1576        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1577
1578    if( m->refillUpkeepTimer == NULL )
1579        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1580}
1581
1582void
1583tr_peerMgrStartTorrent( tr_torrent * tor )
1584{
1585    Torrent * t = tor->torrentPeers;
1586
1587    assert( t != NULL );
1588    managerLock( t->manager );
1589    ensureMgrTimersExist( t->manager );
1590
1591    if( !t->isRunning )
1592    {
1593        t->isRunning = TRUE;
1594
1595        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1596            refillSoon( t );
1597    }
1598
1599    rechokePulse( t->manager );
1600    managerUnlock( t->manager );
1601}
1602
1603static void
1604stopTorrent( Torrent * t )
1605{
1606    assert( torrentIsLocked( t ) );
1607
1608    t->isRunning = FALSE;
1609
1610    /* disconnect the peers. */
1611    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1612    tr_ptrArrayClear( &t->peers );
1613
1614    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1615     * which removes the handshake from t->outgoingHandshakes... */
1616    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1617        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1618}
1619
1620void
1621tr_peerMgrStopTorrent( tr_torrent * tor )
1622{
1623    Torrent * t = tor->torrentPeers;
1624
1625    managerLock( t->manager );
1626
1627    stopTorrent( t );
1628
1629    managerUnlock( t->manager );
1630}
1631
1632void
1633tr_peerMgrAddTorrent( tr_peerMgr * manager,
1634                      tr_torrent * tor )
1635{
1636    managerLock( manager );
1637
1638    assert( tor );
1639    assert( tor->torrentPeers == NULL );
1640
1641    tor->torrentPeers = torrentConstructor( manager, tor );
1642
1643    managerUnlock( manager );
1644}
1645
1646void
1647tr_peerMgrRemoveTorrent( tr_torrent * tor )
1648{
1649    tr_torrentLock( tor );
1650
1651    stopTorrent( tor->torrentPeers );
1652    torrentDestructor( tor->torrentPeers );
1653
1654    tr_torrentUnlock( tor );
1655}
1656
1657void
1658tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1659                               int8_t           * tab,
1660                               unsigned int       tabCount )
1661{
1662    tr_piece_index_t   i;
1663    const Torrent *    t;
1664    float              interval;
1665    tr_bool            isSeed;
1666    int                peerCount;
1667    const tr_peer **   peers;
1668    tr_torrentLock( tor );
1669
1670    t = tor->torrentPeers;
1671    tor = t->tor;
1672    interval = tor->info.pieceCount / (float)tabCount;
1673    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1674    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1675    peerCount = tr_ptrArraySize( &t->peers );
1676
1677    memset( tab, 0, tabCount );
1678
1679    for( i = 0; tor && i < tabCount; ++i )
1680    {
1681        const int piece = i * interval;
1682
1683        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1684            tab[i] = -1;
1685        else if( peerCount ) {
1686            int j;
1687            for( j = 0; j < peerCount; ++j )
1688                if( tr_bitfieldHas( peers[j]->have, i ) )
1689                    ++tab[i];
1690        }
1691    }
1692
1693    tr_torrentUnlock( tor );
1694}
1695
1696/* Returns the pieces that are available from peers */
1697tr_bitfield*
1698tr_peerMgrGetAvailable( const tr_torrent * tor )
1699{
1700    int i;
1701    int peerCount;
1702    Torrent * t = tor->torrentPeers;
1703    const tr_peer ** peers;
1704    tr_bitfield * pieces;
1705    managerLock( t->manager );
1706
1707    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1708    peerCount = tr_ptrArraySize( &t->peers );
1709    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1710    for( i=0; i<peerCount; ++i )
1711        tr_bitfieldOr( pieces, peers[i]->have );
1712
1713    managerUnlock( t->manager );
1714    return pieces;
1715}
1716
1717void
1718tr_peerMgrTorrentStats( tr_torrent       * tor,
1719                        int              * setmePeersKnown,
1720                        int              * setmePeersConnected,
1721                        int              * setmeSeedsConnected,
1722                        int              * setmeWebseedsSendingToUs,
1723                        int              * setmePeersSendingToUs,
1724                        int              * setmePeersGettingFromUs,
1725                        int              * setmePeersFrom )
1726{
1727    int i, size;
1728    const Torrent * t = tor->torrentPeers;
1729    const tr_peer ** peers;
1730    const tr_webseed ** webseeds;
1731
1732    managerLock( t->manager );
1733
1734    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1735    size = tr_ptrArraySize( &t->peers );
1736
1737    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1738    *setmePeersConnected       = 0;
1739    *setmeSeedsConnected       = 0;
1740    *setmePeersGettingFromUs   = 0;
1741    *setmePeersSendingToUs     = 0;
1742    *setmeWebseedsSendingToUs  = 0;
1743
1744    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1745        setmePeersFrom[i] = 0;
1746
1747    for( i=0; i<size; ++i )
1748    {
1749        const tr_peer * peer = peers[i];
1750        const struct peer_atom * atom = peer->atom;
1751
1752        if( peer->io == NULL ) /* not connected */
1753            continue;
1754
1755        ++*setmePeersConnected;
1756
1757        ++setmePeersFrom[atom->from];
1758
1759        if( clientIsDownloadingFrom( peer ) )
1760            ++*setmePeersSendingToUs;
1761
1762        if( clientIsUploadingTo( peer ) )
1763            ++*setmePeersGettingFromUs;
1764
1765        if( atom->flags & ADDED_F_SEED_FLAG )
1766            ++*setmeSeedsConnected;
1767    }
1768
1769    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1770    size = tr_ptrArraySize( &t->webseeds );
1771    for( i=0; i<size; ++i )
1772        if( tr_webseedIsActive( webseeds[i] ) )
1773            ++*setmeWebseedsSendingToUs;
1774
1775    managerUnlock( t->manager );
1776}
1777
1778float
1779tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1780{
1781    int i;
1782    float tmp;
1783    float ret = 0;
1784
1785    const Torrent * t = tor->torrentPeers;
1786    const int n = tr_ptrArraySize( &t->webseeds );
1787    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1788
1789    for( i=0; i<n; ++i )
1790        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1791            ret += tmp;
1792
1793    return ret;
1794}
1795
1796
1797float*
1798tr_peerMgrWebSpeeds( const tr_torrent * tor )
1799{
1800    const Torrent * t = tor->torrentPeers;
1801    const tr_webseed ** webseeds;
1802    int i;
1803    int webseedCount;
1804    float * ret;
1805    uint64_t now;
1806
1807    assert( t->manager );
1808    managerLock( t->manager );
1809
1810    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1811    webseedCount = tr_ptrArraySize( &t->webseeds );
1812    assert( webseedCount == tor->info.webseedCount );
1813    ret = tr_new0( float, webseedCount );
1814    now = tr_date( );
1815
1816    for( i=0; i<webseedCount; ++i )
1817        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1818            ret[i] = -1.0;
1819
1820    managerUnlock( t->manager );
1821    return ret;
1822}
1823
1824double
1825tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1826{
1827    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1828}
1829
1830
1831struct tr_peer_stat *
1832tr_peerMgrPeerStats( const tr_torrent    * tor,
1833                     int                 * setmeCount )
1834{
1835    int i, size;
1836    const Torrent * t = tor->torrentPeers;
1837    const tr_peer ** peers;
1838    tr_peer_stat * ret;
1839    uint64_t now;
1840
1841    assert( t->manager );
1842    managerLock( t->manager );
1843
1844    size = tr_ptrArraySize( &t->peers );
1845    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1846    ret = tr_new0( tr_peer_stat, size );
1847    now = tr_date( );
1848
1849    for( i=0; i<size; ++i )
1850    {
1851        char *                   pch;
1852        const tr_peer *          peer = peers[i];
1853        const struct peer_atom * atom = peer->atom;
1854        tr_peer_stat *           stat = ret + i;
1855
1856        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1857        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1858                   sizeof( stat->client ) );
1859        stat->port               = ntohs( peer->port );
1860        stat->from               = atom->from;
1861        stat->progress           = peer->progress;
1862        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1863        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1864        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1865        stat->peerIsChoked       = peer->peerIsChoked;
1866        stat->peerIsInterested   = peer->peerIsInterested;
1867        stat->clientIsChoked     = peer->clientIsChoked;
1868        stat->clientIsInterested = peer->clientIsInterested;
1869        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1870        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1871        stat->isUploadingTo      = clientIsUploadingTo( peer );
1872        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1873
1874        pch = stat->flagStr;
1875        if( t->optimistic == peer ) *pch++ = 'O';
1876        if( stat->isDownloadingFrom ) *pch++ = 'D';
1877        else if( stat->clientIsInterested ) *pch++ = 'd';
1878        if( stat->isUploadingTo ) *pch++ = 'U';
1879        else if( stat->peerIsInterested ) *pch++ = 'u';
1880        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1881        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1882        if( stat->isEncrypted ) *pch++ = 'E';
1883        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1884        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1885        if( stat->isIncoming ) *pch++ = 'I';
1886        *pch = '\0';
1887    }
1888
1889    *setmeCount = size;
1890
1891    managerUnlock( t->manager );
1892    return ret;
1893}
1894
1895/**
1896***
1897**/
1898
1899struct ChokeData
1900{
1901    tr_bool         doUnchoke;
1902    tr_bool         isInterested;
1903    tr_bool         isChoked;
1904    int             rate;
1905    tr_peer *       peer;
1906};
1907
1908static int
1909compareChoke( const void * va,
1910              const void * vb )
1911{
1912    const struct ChokeData * a = va;
1913    const struct ChokeData * b = vb;
1914
1915    if( a->rate != b->rate ) /* prefer higher overall speeds */
1916        return a->rate > b->rate ? -1 : 1;
1917
1918    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1919        return a->isChoked ? 1 : -1;
1920
1921    return 0;
1922}
1923
1924static int
1925isNew( const tr_peer * peer )
1926{
1927    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1928}
1929
1930static int
1931isSame( const tr_peer * peer )
1932{
1933    return peer && peer->client && strstr( peer->client, "Transmission" );
1934}
1935
1936/**
1937***
1938**/
1939
1940static void
1941rechokeTorrent( Torrent * t, const uint64_t now )
1942{
1943    int i, size, unchokedInterested;
1944    const int peerCount = tr_ptrArraySize( &t->peers );
1945    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1946    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1947    const tr_session * session = t->manager->session;
1948    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1949
1950    assert( torrentIsLocked( t ) );
1951
1952    /* sort the peers by preference and rate */
1953    for( i = 0, size = 0; i < peerCount; ++i )
1954    {
1955        tr_peer * peer = peers[i];
1956        struct peer_atom * atom = peer->atom;
1957
1958        if( peer->progress >= 1.0 ) /* choke all seeds */
1959        {
1960            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1961        }
1962        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1963        {
1964            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1965        }
1966        else if( chokeAll ) /* choke everyone if we're not uploading */
1967        {
1968            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1969        }
1970        else
1971        {
1972            struct ChokeData * n = &choke[size++];
1973            n->peer         = peer;
1974            n->isInterested = peer->peerIsInterested;
1975            n->isChoked     = peer->peerIsChoked;
1976            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1977        }
1978    }
1979
1980    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1981
1982    /**
1983     * Reciprocation and number of uploads capping is managed by unchoking
1984     * the N peers which have the best upload rate and are interested.
1985     * This maximizes the client's download rate. These N peers are
1986     * referred to as downloaders, because they are interested in downloading
1987     * from the client.
1988     *
1989     * Peers which have a better upload rate (as compared to the downloaders)
1990     * but aren't interested get unchoked. If they become interested, the
1991     * downloader with the worst upload rate gets choked. If a client has
1992     * a complete file, it uses its upload rate rather than its download
1993     * rate to decide which peers to unchoke.
1994     */
1995    unchokedInterested = 0;
1996    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1997        choke[i].doUnchoke = 1;
1998        if( choke[i].isInterested )
1999            ++unchokedInterested;
2000    }
2001
2002    /* optimistic unchoke */
2003    if( i < size )
2004    {
2005        int n;
2006        struct ChokeData * c;
2007        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2008
2009        for( ; i<size; ++i )
2010        {
2011            if( choke[i].isInterested )
2012            {
2013                const tr_peer * peer = choke[i].peer;
2014                int x = 1, y;
2015                if( isNew( peer ) ) x *= 3;
2016                if( isSame( peer ) ) x *= 3;
2017                for( y=0; y<x; ++y )
2018                    tr_ptrArrayAppend( &randPool, &choke[i] );
2019            }
2020        }
2021
2022        if(( n = tr_ptrArraySize( &randPool )))
2023        {
2024            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2025            c->doUnchoke = 1;
2026            t->optimistic = c->peer;
2027        }
2028
2029        tr_ptrArrayDestruct( &randPool, NULL );
2030    }
2031
2032    for( i=0; i<size; ++i )
2033        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2034
2035    /* cleanup */
2036    tr_free( choke );
2037}
2038
2039static int
2040rechokePulse( void * vmgr )
2041{
2042    uint64_t now;
2043    tr_torrent * tor = NULL;
2044    tr_peerMgr * mgr = vmgr;
2045    managerLock( mgr );
2046
2047    now = tr_date( );
2048    while(( tor = tr_torrentNext( mgr->session, tor )))
2049        if( tor->isRunning )
2050            rechokeTorrent( tor->torrentPeers, now );
2051
2052    managerUnlock( mgr );
2053    return TRUE;
2054}
2055
2056/***
2057****
2058****  Life and Death
2059****
2060***/
2061
2062typedef enum
2063{
2064    TR_CAN_KEEP,
2065    TR_CAN_CLOSE,
2066    TR_MUST_CLOSE,
2067}
2068tr_close_type_t;
2069
2070static tr_close_type_t
2071shouldPeerBeClosed( const Torrent    * t,
2072                    const tr_peer    * peer,
2073                    int                peerCount )
2074{
2075    const tr_torrent *       tor = t->tor;
2076    const time_t             now = time( NULL );
2077    const struct peer_atom * atom = peer->atom;
2078
2079    /* if it's marked for purging, close it */
2080    if( peer->doPurge )
2081    {
2082        tordbg( t, "purging peer %s because its doPurge flag is set",
2083                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2084        return TR_MUST_CLOSE;
2085    }
2086
2087    /* if we're seeding and the peer has everything we have,
2088     * and enough time has passed for a pex exchange, then disconnect */
2089    if( tr_torrentIsSeed( tor ) )
2090    {
2091        int peerHasEverything;
2092        if( atom->flags & ADDED_F_SEED_FLAG )
2093            peerHasEverything = TRUE;
2094        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2095            peerHasEverything = FALSE;
2096        else {
2097            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2098            tr_bitfieldDifference( tmp, peer->have );
2099            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2100            tr_bitfieldFree( tmp );
2101        }
2102
2103        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2104        {
2105            tordbg( t, "purging peer %s because we're both seeds",
2106                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2107            return TR_MUST_CLOSE;
2108        }
2109    }
2110
2111    /* disconnect if it's been too long since piece data has been transferred.
2112     * this is on a sliding scale based on number of available peers... */
2113    {
2114        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2115        /* if we have >= relaxIfFewerThan, strictness is 100%.
2116         * if we have zero connections, strictness is 0% */
2117        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2118                               ? 1.0
2119                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2120        const int lo = MIN_UPLOAD_IDLE_SECS;
2121        const int hi = MAX_UPLOAD_IDLE_SECS;
2122        const int limit = hi - ( ( hi - lo ) * strictness );
2123        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2124/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2125        if( idleTime > limit ) {
2126            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2127                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2128            return TR_CAN_CLOSE;
2129        }
2130    }
2131
2132    return TR_CAN_KEEP;
2133}
2134
2135static tr_peer **
2136getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2137{
2138    int i, peerCount, outsize;
2139    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2140    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2141
2142    assert( torrentIsLocked( t ) );
2143
2144    for( i = outsize = 0; i < peerCount; ++i )
2145        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2146            ret[outsize++] = peers[i];
2147
2148    *setmeSize = outsize;
2149    return ret;
2150}
2151
2152static int
2153compareCandidates( const void * va, const void * vb )
2154{
2155    const struct peer_atom * a = *(const struct peer_atom**) va;
2156    const struct peer_atom * b = *(const struct peer_atom**) vb;
2157
2158    /* <Charles> Here we would probably want to try reconnecting to
2159     * peers that had most recently given us data. Lots of users have
2160     * trouble with resets due to their routers and/or ISPs. This way we
2161     * can quickly recover from an unwanted reset. So we sort
2162     * piece_data_time in descending order.
2163     */
2164
2165    if( a->piece_data_time != b->piece_data_time )
2166        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2167
2168    if( a->numFails != b->numFails )
2169        return a->numFails < b->numFails ? -1 : 1;
2170
2171    if( a->time != b->time )
2172        return a->time < b->time ? -1 : 1;
2173
2174    /* In order to avoid fragmenting the swarm, peers from trackers and
2175     * from the DHT should be preferred to peers from PEX. */
2176    if( a->from != b->from )
2177        return a->from < b->from ? -1 : 1;
2178
2179    return 0;
2180}
2181
2182static int
2183getReconnectIntervalSecs( const struct peer_atom * atom )
2184{
2185    int          sec;
2186    const time_t now = time( NULL );
2187
2188    /* if we were recently connected to this peer and transferring piece
2189     * data, try to reconnect to them sooner rather that later -- we don't
2190     * want network troubles to get in the way of a good peer. */
2191    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2192        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2193
2194    /* don't allow reconnects more often than our minimum */
2195    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2196        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2197
2198    /* otherwise, the interval depends on how many times we've tried
2199     * and failed to connect to the peer */
2200    else switch( atom->numFails ) {
2201        case 0: sec = 0; break;
2202        case 1: sec = 5; break;
2203        case 2: sec = 2 * 60; break;
2204        case 3: sec = 15 * 60; break;
2205        case 4: sec = 30 * 60; break;
2206        case 5: sec = 60 * 60; break;
2207        default: sec = 120 * 60; break;
2208    }
2209
2210    return sec;
2211}
2212
2213static struct peer_atom **
2214getPeerCandidates( Torrent * t, int * setmeSize )
2215{
2216    int                 i, atomCount, retCount;
2217    struct peer_atom ** atoms;
2218    struct peer_atom ** ret;
2219    const time_t        now = time( NULL );
2220    const int           seed = tr_torrentIsSeed( t->tor );
2221
2222    assert( torrentIsLocked( t ) );
2223
2224    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2225    ret = tr_new( struct peer_atom*, atomCount );
2226    for( i = retCount = 0; i < atomCount; ++i )
2227    {
2228        int                interval;
2229        struct peer_atom * atom = atoms[i];
2230
2231        /* peer fed us too much bad data ... we only keep it around
2232         * now to weed it out in case someone sends it to us via pex */
2233        if( atom->myflags & MYFLAG_BANNED )
2234            continue;
2235
2236        /* peer was unconnectable before, so we're not going to keep trying.
2237         * this is needs a separate flag from `banned', since if they try
2238         * to connect to us later, we'll let them in */
2239        if( atom->myflags & MYFLAG_UNREACHABLE )
2240            continue;
2241
2242        /* we don't need two connections to the same peer... */
2243        if( peerIsInUse( t, &atom->addr ) )
2244            continue;
2245
2246        /* no need to connect if we're both seeds... */
2247        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2248                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2249            continue;
2250
2251        /* don't reconnect too often */
2252        interval = getReconnectIntervalSecs( atom );
2253        if( ( now - atom->time ) < interval )
2254        {
2255            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2256                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2257            continue;
2258        }
2259
2260        /* Don't connect to peers in our blocklist */
2261        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2262            continue;
2263
2264        ret[retCount++] = atom;
2265    }
2266
2267    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2268    *setmeSize = retCount;
2269    return ret;
2270}
2271
2272static void
2273closePeer( Torrent * t, tr_peer * peer )
2274{
2275    struct peer_atom * atom;
2276
2277    assert( t != NULL );
2278    assert( peer != NULL );
2279
2280    atom = peer->atom;
2281
2282    /* if we transferred piece data, then they might be good peers,
2283       so reset their `numFails' weight to zero.  otherwise we connected
2284       to them fruitlessly, so mark it as another fail */
2285    if( atom->piece_data_time )
2286        atom->numFails = 0;
2287    else
2288        ++atom->numFails;
2289
2290    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2291    removePeer( t, peer );
2292}
2293
2294static void
2295reconnectTorrent( Torrent * t )
2296{
2297    static time_t prevTime = 0;
2298    static int    newConnectionsThisSecond = 0;
2299    time_t        now;
2300
2301    now = time( NULL );
2302    if( prevTime != now )
2303    {
2304        prevTime = now;
2305        newConnectionsThisSecond = 0;
2306    }
2307
2308    if( !t->isRunning )
2309    {
2310        removeAllPeers( t );
2311    }
2312    else
2313    {
2314        int i;
2315        int canCloseCount;
2316        int mustCloseCount;
2317        int candidateCount;
2318        int maxCandidates;
2319        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2320        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2321        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2322
2323        tordbg( t, "reconnect pulse for [%s]: "
2324                   "%d must-close connections, "
2325                   "%d can-close connections, "
2326                   "%d connection candidates, "
2327                   "%d atoms, "
2328                   "max per pulse is %d",
2329                   t->tor->info.name,
2330                   mustCloseCount,
2331                   canCloseCount,
2332                   candidateCount,
2333                   tr_ptrArraySize( &t->pool ),
2334                   MAX_RECONNECTIONS_PER_PULSE );
2335
2336        /* disconnect the really bad peers */
2337        for( i=0; i<mustCloseCount; ++i )
2338            closePeer( t, mustClose[i] );
2339
2340        /* decide how many peers can we try to add in this pass */
2341        maxCandidates = candidateCount;
2342        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2343        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2344        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2345
2346        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2347        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2348            closePeer( t, canClose[i] );
2349
2350        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2351                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2352                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2353                   candidateCount,
2354                   MAX_RECONNECTIONS_PER_PULSE,
2355                   getPeerCount( t ),
2356                   getMaxPeerCount( t->tor ),
2357                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2358
2359        /* add some new ones */
2360        for( i=0; i<maxCandidates; ++i )
2361        {
2362            tr_peerMgr        * mgr = t->manager;
2363            struct peer_atom  * atom = candidates[i];
2364            tr_peerIo         * io;
2365
2366            tordbg( t, "Starting an OUTGOING connection with %s",
2367                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2368
2369            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2370
2371            if( io == NULL )
2372            {
2373                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2374                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2375                atom->myflags |= MYFLAG_UNREACHABLE;
2376            }
2377            else
2378            {
2379                tr_handshake * handshake = tr_handshakeNew( io,
2380                                                            mgr->session->encryptionMode,
2381                                                            myHandshakeDoneCB,
2382                                                            mgr );
2383
2384                assert( tr_peerIoGetTorrentHash( io ) );
2385
2386                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2387
2388                ++newConnectionsThisSecond;
2389
2390                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2391                                         handshakeCompare );
2392            }
2393
2394            atom->time = time( NULL );
2395        }
2396
2397        /* cleanup */
2398        tr_free( candidates );
2399        tr_free( mustClose );
2400        tr_free( canClose );
2401    }
2402}
2403
2404struct peer_liveliness
2405{
2406    tr_peer * peer;
2407    void * clientData;
2408    time_t pieceDataTime;
2409    time_t time;
2410    int speed;
2411    tr_bool doPurge;
2412};
2413
2414static int
2415comparePeerLiveliness( const void * va, const void * vb )
2416{
2417    const struct peer_liveliness * a = va;
2418    const struct peer_liveliness * b = vb;
2419
2420    if( a->doPurge != b->doPurge )
2421        return a->doPurge ? 1 : -1;
2422
2423    if( a->speed != b->speed ) /* faster goes first */
2424        return a->speed > b->speed ? -1 : 1;
2425
2426    /* the one to give us data more recently goes first */
2427    if( a->pieceDataTime != b->pieceDataTime )
2428        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2429
2430    /* the one we connected to most recently goes first */
2431    if( a->time != b->time )
2432        return a->time > b->time ? -1 : 1;
2433
2434    return 0;
2435}
2436
2437/* FIXME: getPeersToClose() should use this */
2438static void
2439sortPeersByLiveliness( tr_peer ** peers, void** clientData, int n, uint64_t now )
2440{
2441    int i;
2442    struct peer_liveliness *lives, *l;
2443
2444    /* build a sortable array of peer + extra info */
2445    lives = l = tr_new0( struct peer_liveliness, n );
2446    for( i=0; i<n; ++i, ++l ) {
2447        tr_peer * p = peers[i];
2448        l->peer = p;
2449        l->doPurge = p->doPurge;
2450        l->pieceDataTime = p->atom->piece_data_time;
2451        l->time = p->atom->time;
2452        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2453                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2454        if( clientData )
2455            l->clientData = clientData[i];
2456    }
2457
2458    /* sort 'em */
2459    assert( n == ( l - lives ) );
2460    qsort( lives, n, sizeof( struct peer_liveliness ), comparePeerLiveliness );
2461
2462    /* build the peer array */
2463    for( i=0, l=lives; i<n; ++i, ++l ) {
2464        peers[i] = l->peer;
2465        if( clientData )
2466            clientData[i] = l->clientData;
2467    }
2468    assert( n == ( l - lives ) );
2469
2470    /* cleanup */
2471    tr_free( lives );
2472}
2473
2474static void
2475enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2476{
2477    int n = tr_ptrArraySize( &t->peers );
2478    const int max = tr_torrentGetPeerLimit( t->tor );
2479    if( n > max )
2480    {
2481        void * base = tr_ptrArrayBase( &t->peers );
2482        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2483        sortPeersByLiveliness( peers, NULL, n, now );
2484        while( n > max )
2485            closePeer( t, peers[--n] );
2486        tr_free( peers );
2487    }
2488}
2489
2490static void
2491enforceSessionPeerLimit( tr_session * session, uint64_t now )
2492{
2493    int n = 0;
2494    tr_torrent * tor = NULL;
2495    const int max = tr_sessionGetPeerLimit( session );
2496
2497    /* count the total number of peers */
2498    while(( tor = tr_torrentNext( session, tor )))
2499        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2500
2501    /* if there are too many, prune out the worst */
2502    if( n > max )
2503    {
2504        tr_peer ** peers = tr_new( tr_peer*, n );
2505        Torrent ** torrents = tr_new( Torrent*, n );
2506
2507        /* populate the peer array */
2508        n = 0;
2509        tor = NULL;
2510        while(( tor = tr_torrentNext( session, tor ))) {
2511            int i;
2512            Torrent * t = tor->torrentPeers;
2513            const int tn = tr_ptrArraySize( &t->peers );
2514            for( i=0; i<tn; ++i, ++n ) {
2515                peers[n] = tr_ptrArrayNth( &t->peers, i );
2516                torrents[n] = t;
2517            }
2518        }
2519
2520        /* sort 'em */
2521        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2522
2523        /* cull out the crappiest */
2524        while( n-- > max )
2525            closePeer( torrents[n], peers[n] );
2526
2527        /* cleanup */
2528        tr_free( torrents );
2529        tr_free( peers );
2530    }
2531}
2532
2533
2534static int
2535reconnectPulse( void * vmgr )
2536{
2537    tr_torrent * tor;
2538    tr_peerMgr * mgr = vmgr;
2539    uint64_t now;
2540    managerLock( mgr );
2541
2542    now = tr_date( );
2543
2544    /* if we're over the per-torrent peer limits, cull some peers */
2545    tor = NULL;
2546    while(( tor = tr_torrentNext( mgr->session, tor )))
2547        if( tor->isRunning )
2548            enforceTorrentPeerLimit( tor->torrentPeers, now );
2549
2550    /* if we're over the per-session peer limits, cull some peers */
2551    enforceSessionPeerLimit( mgr->session, now );
2552
2553    tor = NULL;
2554    while(( tor = tr_torrentNext( mgr->session, tor )))
2555        if( tor->isRunning )
2556            reconnectTorrent( tor->torrentPeers );
2557
2558    managerUnlock( mgr );
2559    return TRUE;
2560}
2561
2562/****
2563*****
2564*****  BANDWIDTH ALLOCATION
2565*****
2566****/
2567
2568static void
2569pumpAllPeers( tr_peerMgr * mgr )
2570{
2571    tr_torrent * tor = NULL;
2572
2573    while(( tor = tr_torrentNext( mgr->session, tor )))
2574    {
2575        int j;
2576        Torrent * t = tor->torrentPeers;
2577
2578        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2579        {
2580            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2581            tr_peerMsgsPulse( peer->msgs );
2582        }
2583    }
2584}
2585
2586static int
2587bandwidthPulse( void * vmgr )
2588{
2589    tr_torrent * tor = NULL;
2590    tr_peerMgr * mgr = vmgr;
2591    managerLock( mgr );
2592
2593    /* FIXME: this next line probably isn't necessary... */
2594    pumpAllPeers( mgr );
2595
2596    /* allocate bandwidth to the peers */
2597    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2598    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2599
2600    /* possibly stop torrents that have seeded enough */
2601    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2602        if( tor->needsSeedRatioCheck ) {
2603            tor->needsSeedRatioCheck = FALSE;
2604            tr_torrentCheckSeedRatio( tor );
2605        }
2606    }
2607
2608    /* possibly stop torrents that have an error */
2609    tor = NULL;
2610    while(( tor = tr_torrentNext( mgr->session, tor )))
2611        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2612            tr_torrentStop( tor );
2613
2614    managerUnlock( mgr );
2615    return TRUE;
2616}
Note: See TracBrowser for help on using the repository browser.