source: branches/1.7x/libtransmission/peer-mgr.c @ 9090

Last change on this file since 9090 was 9090, checked in by charles, 13 years ago

(1.7x libT) fix a couple of compiler warnings from the last commit.

  • Property svn:keywords set to Date Rev Author Id
File size: 74.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9090 2009-09-10 03:29:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "fdlimit.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 400,
50
51    /* how frequently to reallocate bandwidth */
52    BANDWIDTH_PERIOD_MSEC = 500,
53
54    /* how frequently to age out old piece request lists */
55    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = 500,
59
60    /* when many peers are available, keep idle ones this long */
61    MIN_UPLOAD_IDLE_SECS = ( 30 ),
62
63    /* when few peers are available, keep idle ones this long */
64    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
65
66    /* max # of peers to ask fer per torrent per reconnect pulse */
67    MAX_RECONNECTIONS_PER_PULSE = 16,
68
69    /* max number of peers to ask for per second overall.
70    * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 32,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* amount of time to keep a list of request pieces lying around
77       before it's considered too old and needs to be rebuilt */
78    PIECE_LIST_SHELF_LIFE_SECS = 60,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    MYFLAG_BANNED = 1,
82
83    /* use for bitwise operations w/peer_atom.myflags */
84    /* unreachable for now... but not banned.
85     * if they try to connect to us it's okay */
86    MYFLAG_UNREACHABLE = 2,
87
88    /* the minimum we'll wait before attempting to reconnect to a peer */
89    MINIMUM_RECONNECT_INTERVAL_SECS = 5
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    uint8_t     from;
116    uint8_t     flags;       /* these match the added_f flags */
117    uint8_t     myflags;     /* flags that aren't defined in added_f */
118    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
119    tr_port     port;
120    uint16_t    numFails;
121    tr_address  addr;
122    time_t      time;        /* when the peer's connection status last changed */
123    time_t      piece_data_time;
124};
125
126static tr_bool
127tr_isAtom( const struct peer_atom * atom )
128{
129    return ( atom != NULL )
130        && ( atom->from < TR_PEER_FROM__MAX )
131        && ( tr_isAddress( &atom->addr ) );
132}
133
134struct tr_blockIterator
135{
136    time_t expirationDate;
137    struct tr_torrent_peers * t;
138    tr_block_index_t blockIndex, blockCount, *blocks;
139    tr_piece_index_t pieceIndex, pieceCount, *pieces;
140};
141
142typedef struct tr_torrent_peers
143{
144    struct event               refillTimer;
145
146    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
147    tr_ptrArray                pool; /* struct peer_atom */
148    tr_ptrArray                peers; /* tr_peer */
149    tr_ptrArray                webseeds; /* tr_webseed */
150
151    tr_torrent               * tor;
152    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
153    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
154    struct tr_peerMgr        * manager;
155    int                      * pendingRequestCount;
156
157    tr_bool                    isRunning;
158}
159Torrent;
160
161struct tr_peerMgr
162{
163    tr_session      * session;
164    tr_ptrArray       incomingHandshakes; /* tr_handshake */
165    tr_timer        * bandwidthTimer;
166    tr_timer        * rechokeTimer;
167    tr_timer        * reconnectTimer;
168    tr_timer        * refillUpkeepTimer;
169};
170
171#define tordbg( t, ... ) \
172    do { \
173        if( tr_deepLoggingIsActive( ) ) \
174            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
175    } while( 0 )
176
177#define dbgmsg( ... ) \
178    do { \
179        if( tr_deepLoggingIsActive( ) ) \
180            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
181    } while( 0 )
182
183/**
184***
185**/
186
187static TR_INLINE void
188managerLock( const struct tr_peerMgr * manager )
189{
190    tr_globalLock( manager->session );
191}
192
193static TR_INLINE void
194managerUnlock( const struct tr_peerMgr * manager )
195{
196    tr_globalUnlock( manager->session );
197}
198
199static TR_INLINE void
200torrentLock( Torrent * torrent )
201{
202    managerLock( torrent->manager );
203}
204
205static TR_INLINE void
206torrentUnlock( Torrent * torrent )
207{
208    managerUnlock( torrent->manager );
209}
210
211static TR_INLINE int
212torrentIsLocked( const Torrent * t )
213{
214    return tr_globalIsLocked( t->manager->session );
215}
216
217/**
218***
219**/
220
221static int
222handshakeCompareToAddr( const void * va, const void * vb )
223{
224    const tr_handshake * a = va;
225
226    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
227}
228
229static int
230handshakeCompare( const void * a, const void * b )
231{
232    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
233}
234
235static tr_handshake*
236getExistingHandshake( tr_ptrArray      * handshakes,
237                      const tr_address * addr )
238{
239    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
240}
241
242static int
243comparePeerAtomToAddress( const void * va, const void * vb )
244{
245    const struct peer_atom * a = va;
246
247    return tr_compareAddresses( &a->addr, vb );
248}
249
250static int
251comparePeerAtoms( const void * va, const void * vb )
252{
253    const struct peer_atom * b = vb;
254
255    assert( tr_isAtom( b ) );
256
257    return comparePeerAtomToAddress( va, &b->addr );
258}
259
260/**
261***
262**/
263
264static Torrent*
265getExistingTorrent( tr_peerMgr *    manager,
266                    const uint8_t * hash )
267{
268    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
269
270    return tor == NULL ? NULL : tor->torrentPeers;
271}
272
273static int
274peerCompare( const void * va, const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return tr_compareAddresses( &a->addr, &b->addr );
280}
281
282static int
283peerCompareToAddr( const void * va, const void * vb )
284{
285    const tr_peer * a = va;
286
287    return tr_compareAddresses( &a->addr, vb );
288}
289
290static tr_peer*
291getExistingPeer( Torrent          * torrent,
292                 const tr_address * addr )
293{
294    assert( torrentIsLocked( torrent ) );
295    assert( addr );
296
297    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
298}
299
300static struct peer_atom*
301getExistingAtom( const Torrent    * t,
302                 const tr_address * addr )
303{
304    Torrent * tt = (Torrent*)t;
305    assert( torrentIsLocked( t ) );
306    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
307}
308
309static tr_bool
310peerIsInUse( const Torrent    * ct,
311             const tr_address * addr )
312{
313    Torrent * t = (Torrent*) ct;
314
315    assert( torrentIsLocked ( t ) );
316
317    return getExistingPeer( t, addr )
318        || getExistingHandshake( &t->outgoingHandshakes, addr )
319        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
320}
321
322static tr_peer*
323peerConstructor( const tr_address * addr )
324{
325    tr_peer * p;
326    p = tr_new0( tr_peer, 1 );
327    p->addr = *addr;
328    return p;
329}
330
331static tr_peer*
332getPeer( Torrent          * torrent,
333         const tr_address * addr )
334{
335    tr_peer * peer;
336
337    assert( torrentIsLocked( torrent ) );
338
339    peer = getExistingPeer( torrent, addr );
340
341    if( peer == NULL )
342    {
343        peer = peerConstructor( addr );
344        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
345    }
346
347    return peer;
348}
349
350static void
351peerDestructor( tr_peer * peer )
352{
353    assert( peer );
354
355    if( peer->msgs != NULL )
356    {
357        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
358        tr_peerMsgsFree( peer->msgs );
359    }
360
361    tr_peerIoClear( peer->io );
362    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
363
364    tr_bitfieldFree( peer->have );
365    tr_bitfieldFree( peer->blame );
366    tr_free( peer->client );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t, tr_peer * peer )
373{
374    tr_peer * removed;
375    struct peer_atom * atom = peer->atom;
376
377    assert( torrentIsLocked( t ) );
378    assert( atom );
379
380    atom->time = time( NULL );
381
382    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
383    assert( removed == peer );
384    peerDestructor( removed );
385}
386
387static void
388removeAllPeers( Torrent * t )
389{
390    while( !tr_ptrArrayEmpty( &t->peers ) )
391        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
392}
393
394static void blockIteratorFree( struct tr_blockIterator ** inout );
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400
401    assert( t );
402    assert( !t->isRunning );
403    assert( torrentIsLocked( t ) );
404    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
405    assert( tr_ptrArrayEmpty( &t->peers ) );
406
407    evtimer_del( &t->refillTimer );
408
409    blockIteratorFree( &t->refillQueue );
410    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
411    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
412    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
413    tr_ptrArrayDestruct( &t->peers, NULL );
414
415    tr_free( t->pendingRequestCount );
416    tr_free( t );
417}
418
419
420static void refillPulse( int, short, void* );
421
422static void peerCallbackFunc( void * vpeer,
423                              void * vevent,
424                              void * vt );
425
426static Torrent*
427torrentConstructor( tr_peerMgr * manager,
428                    tr_torrent * tor )
429{
430    int       i;
431    Torrent * t;
432
433    t = tr_new0( Torrent, 1 );
434    t->manager = manager;
435    t->tor = tor;
436    t->pool = TR_PTR_ARRAY_INIT;
437    t->peers = TR_PTR_ARRAY_INIT;
438    t->webseeds = TR_PTR_ARRAY_INIT;
439    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
440    evtimer_set( &t->refillTimer, refillPulse, t );
441
442
443    for( i = 0; i < tor->info.webseedCount; ++i )
444    {
445        tr_webseed * w =
446            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
447        tr_ptrArrayAppend( &t->webseeds, w );
448    }
449
450    return t;
451}
452
453
454static int bandwidthPulse ( void * vmgr );
455static int rechokePulse   ( void * vmgr );
456static int reconnectPulse ( void * vmgr );
457static int refillUpkeep   ( void * vmgr );
458
459tr_peerMgr*
460tr_peerMgrNew( tr_session * session )
461{
462    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
463    m->session = session;
464    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
465    return m;
466}
467
468static void
469deleteTimers( struct tr_peerMgr * m )
470{
471    if( m->bandwidthTimer )
472        tr_timerFree( &m->bandwidthTimer );
473
474    if( m->rechokeTimer )
475        tr_timerFree( &m->rechokeTimer );
476
477    if( m->reconnectTimer )
478        tr_timerFree( &m->reconnectTimer );
479
480    if( m->refillUpkeepTimer )
481        tr_timerFree( &m->refillUpkeepTimer );
482}
483
484void
485tr_peerMgrFree( tr_peerMgr * manager )
486{
487    managerLock( manager );
488
489    deleteTimers( manager );
490
491    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
492     * the item from manager->handshakes, so this is a little roundabout... */
493    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
494        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
495
496    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
497
498    managerUnlock( manager );
499    tr_free( manager );
500}
501
502static int
503clientIsDownloadingFrom( const tr_peer * peer )
504{
505    return peer->clientIsInterested && !peer->clientIsChoked;
506}
507
508static int
509clientIsUploadingTo( const tr_peer * peer )
510{
511    return peer->peerIsInterested && !peer->peerIsChoked;
512}
513
514/***
515****
516***/
517
518tr_bool
519tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
520                      const tr_address  * addr )
521{
522    tr_bool isSeed = FALSE;
523    const Torrent * t = tor->torrentPeers;
524    const struct peer_atom * atom = getExistingAtom( t, addr );
525
526    if( atom )
527        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
528
529    return isSeed;
530}
531
532/****
533*****
534*****  REFILL
535*****
536****/
537
538static void
539assertValidPiece( Torrent * t, tr_piece_index_t piece )
540{
541    assert( t );
542    assert( t->tor );
543    assert( piece < t->tor->info.pieceCount );
544}
545
546static int
547getPieceRequests( Torrent * t, tr_piece_index_t piece )
548{
549    assertValidPiece( t, piece );
550
551    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
552}
553
554static void
555incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
556{
557    assertValidPiece( t, piece );
558
559    if( t->pendingRequestCount == NULL )
560        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
561    t->pendingRequestCount[piece]++;
562}
563
564static void
565decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
566{
567    assertValidPiece( t, piece );
568
569    if( t->pendingRequestCount )
570        t->pendingRequestCount[piece]--;
571}
572
573struct tr_refill_piece
574{
575    tr_priority_t    priority;
576    uint32_t         piece;
577    uint32_t         peerCount;
578    int              random;
579    int              pendingRequestCount;
580    int              missingBlockCount;
581};
582
583static int
584compareRefillPiece( const void * aIn, const void * bIn )
585{
586    const struct tr_refill_piece * a = aIn;
587    const struct tr_refill_piece * b = bIn;
588
589    /* if one piece has a higher priority, it goes first */
590    if( a->priority != b->priority )
591        return a->priority > b->priority ? -1 : 1;
592
593    /* have a per-priority endgame */
594    if( a->pendingRequestCount != b->pendingRequestCount )
595        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
596
597    /* fewer missing pieces goes first */
598    if( a->missingBlockCount != b->missingBlockCount )
599        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
600
601    /* otherwise if one has fewer peers, it goes first */
602    if( a->peerCount != b->peerCount )
603        return a->peerCount < b->peerCount ? -1 : 1;
604
605    /* otherwise go with our random seed */
606    if( a->random != b->random )
607        return a->random < b->random ? -1 : 1;
608
609    return 0;
610}
611
612static tr_piece_index_t *
613getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
614{
615    const tr_torrent  * tor = t->tor;
616    const tr_info     * inf = &tor->info;
617    tr_piece_index_t    i;
618    tr_piece_index_t    poolSize = 0;
619    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
620    int                 peerCount;
621    const tr_peer    ** peers;
622
623    assert( torrentIsLocked( t ) );
624
625    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
626    peerCount = tr_ptrArraySize( &t->peers );
627
628    /* make a list of the pieces that we want but don't have */
629    for( i = 0; i < inf->pieceCount; ++i )
630        if( !tor->info.pieces[i].dnd
631                && !tr_cpPieceIsComplete( &tor->completion, i ) )
632            pool[poolSize++] = i;
633
634    /* sort the pool by which to request next */
635    if( poolSize > 1 )
636    {
637        tr_piece_index_t j;
638        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
639
640        for( j = 0; j < poolSize; ++j )
641        {
642            int k;
643            const tr_piece_index_t piece = pool[j];
644            struct tr_refill_piece * setme = p + j;
645
646            setme->piece = piece;
647            setme->priority = inf->pieces[piece].priority;
648            setme->peerCount = 0;
649            setme->random = tr_cryptoWeakRandInt( INT_MAX );
650            setme->pendingRequestCount = getPieceRequests( t, piece );
651            setme->missingBlockCount
652                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
653
654            for( k = 0; k < peerCount; ++k )
655            {
656                const tr_peer * peer = peers[k];
657                if( peer->peerIsInterested
658                        && !peer->clientIsChoked
659                        && tr_bitfieldHas( peer->have, piece ) )
660                    ++setme->peerCount;
661            }
662        }
663
664        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
665               compareRefillPiece );
666
667        for( j = 0; j < poolSize; ++j )
668            pool[j] = p[j].piece;
669
670        tr_free( p );
671    }
672
673    *pieceCount = poolSize;
674    return pool;
675}
676
677static struct tr_blockIterator*
678blockIteratorNew( Torrent * t )
679{
680    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
681    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
682    i->t = t;
683    i->pieces = getPreferredPieces( t, &i->pieceCount );
684    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
685    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
686    return i;
687}
688
689static tr_bool
690blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
691{
692    tr_bool found;
693    Torrent * t = i->t;
694    tr_torrent * tor = t->tor;
695
696    while( ( i->blockIndex == i->blockCount )
697        && ( i->pieceIndex < i->pieceCount ) )
698    {
699        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
700        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
701        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
702        tr_block_index_t block;
703
704        assert( index < tor->info.pieceCount );
705
706        i->blockCount = 0;
707        i->blockIndex = 0;
708        for( block=b; block!=e; ++block )
709            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
710                i->blocks[i->blockCount++] = block;
711    }
712
713    assert( i->blockCount <= tor->blockCountInPiece );
714
715    if(( found = ( i->blockIndex < i->blockCount )))
716        *setme = i->blocks[i->blockIndex++];
717
718    return found;
719}
720
721static void
722blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
723{
724    i->blockIndex = i->blockCount;
725}
726
727static void
728blockIteratorFree( struct tr_blockIterator ** inout )
729{
730    struct tr_blockIterator * it = *inout;
731
732    if( it != NULL )
733    {
734        tr_free( it->blocks );
735        tr_free( it->pieces );
736        tr_free( it );
737    }
738
739    *inout = NULL;
740}
741
742static tr_peer**
743getPeersUploadingToClient( Torrent * t,
744                           int *     setmeCount )
745{
746    int j;
747    int peerCount = 0;
748    int retCount = 0;
749    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
750    tr_peer ** ret = tr_new( tr_peer *, peerCount );
751
752    j = 0; /* this is a temporary test to make sure we walk through all the peers */
753    if( peerCount )
754    {
755        /* Get a list of peers we're downloading from.
756           Pick a different starting point each time so all peers
757           get a chance at being the first in line */
758        const int fencepost = tr_cryptoWeakRandInt( peerCount );
759        int i = fencepost;
760        do {
761            if( clientIsDownloadingFrom( peers[i] ) )
762                ret[retCount++] = peers[i];
763            i = ( i + 1 ) % peerCount;
764            ++j;
765        } while( i != fencepost );
766    }
767    assert( j == peerCount );
768    *setmeCount = retCount;
769    return ret;
770}
771
772static uint32_t
773getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
774{
775    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
776    const uint64_t blockPos = tor->blockSize * b;
777    assert( blockPos >= piecePos );
778    return (uint32_t)( blockPos - piecePos );
779}
780
781static int
782refillUpkeep( void * vmgr )
783{
784    tr_torrent * tor = NULL;
785    tr_peerMgr * mgr = vmgr;
786    time_t now;
787    managerLock( mgr );
788
789    now = time( NULL );
790    while(( tor = tr_torrentNext( mgr->session, tor ))) {
791        Torrent * t = tor->torrentPeers;
792        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
793            tordbg( t, "refill queue is past its shelf date; discarding." );
794            blockIteratorFree( &t->refillQueue );
795        }
796    }
797
798    managerUnlock( mgr );
799    return TRUE;
800}
801
802static void
803refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
804{
805    tr_block_index_t block;
806    int peerCount;
807    int webseedCount;
808    tr_peer ** peers;
809    tr_webseed ** webseeds;
810    Torrent * t = vtorrent;
811    tr_torrent * tor = t->tor;
812    tr_bool hasNext = TRUE;
813
814    if( !t->isRunning )
815        return;
816    if( tr_torrentIsSeed( t->tor ) )
817        return;
818
819    torrentLock( t );
820    tordbg( t, "Refilling Request Buffers..." );
821
822    if( t->refillQueue == NULL )
823        t->refillQueue = blockIteratorNew( t );
824
825    peers = getPeersUploadingToClient( t, &peerCount );
826    webseedCount = tr_ptrArraySize( &t->webseeds );
827    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
828                          webseedCount * sizeof( tr_webseed* ) );
829
830    while( ( webseedCount || peerCount )
831        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
832    {
833        int j;
834        tr_bool handled = FALSE;
835
836        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
837        const uint32_t offset = getBlockOffsetInPiece( tor, block );
838        const uint32_t length = tr_torBlockCountBytes( tor, block );
839
840        assert( block < tor->blockCount );
841
842        /* find a peer who can ask for this block */
843        for( j=0; !handled && j<peerCount; )
844        {
845            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
846            switch( val )
847            {
848                case TR_ADDREQ_FULL:
849                case TR_ADDREQ_CLIENT_CHOKED:
850                    peers[j] = peers[--peerCount];
851                    break;
852
853                case TR_ADDREQ_MISSING:
854                case TR_ADDREQ_DUPLICATE:
855                    ++j;
856                    break;
857
858                case TR_ADDREQ_OK:
859                    incrementPieceRequests( t, index );
860                    handled = TRUE;
861                    break;
862
863                default:
864                    assert( 0 && "unhandled value" );
865                    break;
866            }
867        }
868
869        /* maybe one of the webseeds can do it */
870        for( j=0; !handled && j<webseedCount; )
871        {
872            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
873            switch( val )
874            {
875                case TR_ADDREQ_FULL:
876                    webseeds[j] = webseeds[--webseedCount];
877                    break;
878
879                case TR_ADDREQ_OK:
880                    incrementPieceRequests( t, index );
881                    handled = TRUE;
882                    break;
883
884                default:
885                    assert( 0 && "unhandled value" );
886                    break;
887            }
888        }
889
890        if( !handled )
891            blockIteratorSkipCurrentPiece( t->refillQueue );
892    }
893
894    /* cleanup */
895    tr_free( webseeds );
896    tr_free( peers );
897
898    if( !hasNext ) {
899        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
900        blockIteratorFree( &t->refillQueue );
901    }
902
903    torrentUnlock( t );
904}
905
906static void
907broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
908{
909    size_t i;
910    size_t peerCount;
911    tr_peer ** peers;
912
913    assert( torrentIsLocked( t ) );
914
915    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
916
917    peerCount = tr_ptrArraySize( &t->peers );
918    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
919    for( i=0; i<peerCount; ++i )
920        if( peers[i]->msgs )
921            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
922}
923
924static void
925addStrike( Torrent * t, tr_peer * peer )
926{
927    tordbg( t, "increasing peer %s strike count to %d",
928            tr_peerIoAddrStr( &peer->addr,
929                              peer->port ), peer->strikes + 1 );
930
931    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
932    {
933        struct peer_atom * atom = peer->atom;
934        atom->myflags |= MYFLAG_BANNED;
935        peer->doPurge = 1;
936        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
937    }
938}
939
940static void
941gotBadPiece( Torrent *        t,
942             tr_piece_index_t pieceIndex )
943{
944    tr_torrent *   tor = t->tor;
945    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
946
947    tor->corruptCur += byteCount;
948    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
949}
950
951static void
952refillSoon( Torrent * t )
953{
954    if( !evtimer_pending( &t->refillTimer, NULL ) )
955        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
956}
957
958static void
959peerSuggestedPiece( Torrent            * t UNUSED,
960                    tr_peer            * peer UNUSED,
961                    tr_piece_index_t     pieceIndex UNUSED,
962                    int                  isFastAllowed UNUSED )
963{
964#if 0
965    assert( t );
966    assert( peer );
967    assert( peer->msgs );
968
969    /* is this a valid piece? */
970    if(  pieceIndex >= t->tor->info.pieceCount )
971        return;
972
973    /* don't ask for it if we've already got it */
974    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
975        return;
976
977    /* don't ask for it if they don't have it */
978    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
979        return;
980
981    /* don't ask for it if we're choked and it's not fast */
982    if( !isFastAllowed && peer->clientIsChoked )
983        return;
984
985    /* request the blocks that we don't have in this piece */
986    {
987        tr_block_index_t block;
988        const tr_torrent * tor = t->tor;
989        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
990        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
991
992        for( block=start; block<end; ++block )
993        {
994            if( !tr_cpBlockIsComplete( tor->completion, block ) )
995            {
996                const uint32_t offset = getBlockOffsetInPiece( tor, block );
997                const uint32_t length = tr_torBlockCountBytes( tor, block );
998                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
999                incrementPieceRequests( t, pieceIndex );
1000            }
1001        }
1002    }
1003#endif
1004}
1005
1006static void
1007peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1008{
1009    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1010    Torrent * t = vt;
1011    const tr_peer_event * e = vevent;
1012
1013    torrentLock( t );
1014
1015    switch( e->eventType )
1016    {
1017        case TR_PEER_UPLOAD_ONLY:
1018            /* update our atom */
1019            if( peer ) {
1020                if( e->uploadOnly ) {
1021                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1022                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1023                } else {
1024                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1025                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1026                }
1027            }
1028            break;
1029
1030        case TR_PEER_NEED_REQ:
1031            refillSoon( t );
1032            break;
1033
1034        case TR_PEER_CANCEL:
1035            decrementPieceRequests( t, e->pieceIndex );
1036            break;
1037
1038        case TR_PEER_PEER_GOT_DATA:
1039        {
1040            const time_t now = time( NULL );
1041            tr_torrent * tor = t->tor;
1042
1043            tr_torrentSetActivityDate( tor, now );
1044
1045            if( e->wasPieceData ) {
1046                tor->uploadedCur += e->length;
1047                tr_torrentSetDirty( tor );
1048            }
1049
1050            /* update the stats */
1051            if( e->wasPieceData )
1052                tr_statsAddUploaded( tor->session, e->length );
1053
1054            /* update our atom */
1055            if( peer && e->wasPieceData )
1056                peer->atom->piece_data_time = now;
1057
1058            tor->needsSeedRatioCheck = TRUE;
1059
1060            break;
1061        }
1062
1063        case TR_PEER_CLIENT_GOT_SUGGEST:
1064            if( peer )
1065                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1066            break;
1067
1068        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1069            if( peer )
1070                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1071            break;
1072
1073        case TR_PEER_CLIENT_GOT_DATA:
1074        {
1075            const time_t now = time( NULL );
1076            tr_torrent * tor = t->tor;
1077
1078            tr_torrentSetActivityDate( tor, now );
1079
1080            /* only add this to downloadedCur if we got it from a peer --
1081             * webseeds shouldn't count against our ratio.  As one tracker
1082             * admin put it, "Those pieces are downloaded directly from the
1083             * content distributor, not the peers, it is the tracker's job
1084             * to manage the swarms, not the web server and does not fit
1085             * into the jurisdiction of the tracker." */
1086            if( peer && e->wasPieceData ) {
1087                tor->downloadedCur += e->length;
1088                tr_torrentSetDirty( tor );
1089            }
1090
1091            /* update the stats */
1092            if( e->wasPieceData )
1093                tr_statsAddDownloaded( tor->session, e->length );
1094
1095            /* update our atom */
1096            if( peer && e->wasPieceData )
1097                peer->atom->piece_data_time = now;
1098
1099            break;
1100        }
1101
1102        case TR_PEER_PEER_PROGRESS:
1103        {
1104            if( peer )
1105            {
1106                struct peer_atom * atom = peer->atom;
1107                if( e->progress >= 1.0 ) {
1108                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1109                    atom->flags |= ADDED_F_SEED_FLAG;
1110                }
1111            }
1112            break;
1113        }
1114
1115        case TR_PEER_CLIENT_GOT_BLOCK:
1116        {
1117            tr_torrent * tor = t->tor;
1118
1119            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1120
1121            tr_cpBlockAdd( &tor->completion, block );
1122            tr_torrentSetDirty( tor );
1123            decrementPieceRequests( t, e->pieceIndex );
1124
1125            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1126
1127            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1128            {
1129                const tr_piece_index_t p = e->pieceIndex;
1130                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1131
1132                if( !ok )
1133                {
1134                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1135                               (unsigned long)p );
1136                }
1137
1138                tr_torrentSetHasPiece( tor, p, ok );
1139                tr_torrentSetPieceChecked( tor, p, TRUE );
1140                tr_peerMgrSetBlame( tor, p, ok );
1141
1142                if( !ok )
1143                {
1144                    gotBadPiece( t, p );
1145                }
1146                else
1147                {
1148                    int i;
1149                    int peerCount;
1150                    tr_peer ** peers;
1151                    tr_file_index_t fileIndex;
1152
1153                    peerCount = tr_ptrArraySize( &t->peers );
1154                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1155                    for( i=0; i<peerCount; ++i )
1156                        tr_peerMsgsHave( peers[i]->msgs, p );
1157
1158                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1159                    {
1160                        const tr_file * file = &tor->info.files[fileIndex];
1161                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1162                        {
1163                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1164                            tordbg( t, "closing recently-completed file \"%s\"", path );
1165                            tr_fdFileClose( path );
1166                            tr_free( path );
1167                        }
1168                    }
1169                }
1170
1171                tr_torrentRecheckCompleteness( tor );
1172            }
1173            break;
1174        }
1175
1176        case TR_PEER_ERROR:
1177            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1178            {
1179                /* some protocol error from the peer */
1180                peer->doPurge = 1;
1181                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1182                        tr_peerIoAddrStr( &peer->addr, peer->port ) );
1183            }
1184            else 
1185            {
1186                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1187            }
1188            break;
1189
1190        default:
1191            assert( 0 );
1192    }
1193
1194    torrentUnlock( t );
1195}
1196
1197static void
1198ensureAtomExists( Torrent          * t,
1199                  const tr_address * addr,
1200                  tr_port            port,
1201                  uint8_t            flags,
1202                  uint8_t            from )
1203{
1204    assert( tr_isAddress( addr ) );
1205    assert( from < TR_PEER_FROM__MAX );
1206
1207    if( getExistingAtom( t, addr ) == NULL )
1208    {
1209        struct peer_atom * a;
1210        a = tr_new0( struct peer_atom, 1 );
1211        a->addr = *addr;
1212        a->port = port;
1213        a->flags = flags;
1214        a->from = from;
1215        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1216        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1217    }
1218}
1219
1220static int
1221getMaxPeerCount( const tr_torrent * tor )
1222{
1223    return tor->maxConnectedPeers;
1224}
1225
1226static int
1227getPeerCount( const Torrent * t )
1228{
1229    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1230}
1231
1232/* FIXME: this is kind of a mess. */
1233static tr_bool
1234myHandshakeDoneCB( tr_handshake  * handshake,
1235                   tr_peerIo     * io,
1236                   tr_bool         isConnected,
1237                   const uint8_t * peer_id,
1238                   void          * vmanager )
1239{
1240    tr_bool            ok = isConnected;
1241    tr_bool            success = FALSE;
1242    tr_port            port;
1243    const tr_address * addr;
1244    tr_peerMgr       * manager = vmanager;
1245    Torrent          * t;
1246    tr_handshake     * ours;
1247
1248    assert( io );
1249    assert( tr_isBool( ok ) );
1250
1251    t = tr_peerIoHasTorrentHash( io )
1252        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1253        : NULL;
1254
1255    if( tr_peerIoIsIncoming ( io ) )
1256        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1257                                        handshake, handshakeCompare );
1258    else if( t )
1259        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1260                                        handshake, handshakeCompare );
1261    else
1262        ours = handshake;
1263
1264    assert( ours );
1265    assert( ours == handshake );
1266
1267    if( t )
1268        torrentLock( t );
1269
1270    addr = tr_peerIoGetAddress( io, &port );
1271
1272    if( !ok || !t || !t->isRunning )
1273    {
1274        if( t )
1275        {
1276            struct peer_atom * atom = getExistingAtom( t, addr );
1277            if( atom )
1278                ++atom->numFails;
1279        }
1280    }
1281    else /* looking good */
1282    {
1283        struct peer_atom * atom;
1284        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1285        atom = getExistingAtom( t, addr );
1286        atom->time = time( NULL );
1287        atom->piece_data_time = 0;
1288
1289        if( atom->myflags & MYFLAG_BANNED )
1290        {
1291            tordbg( t, "banned peer %s tried to reconnect",
1292                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1293        }
1294        else if( tr_peerIoIsIncoming( io )
1295               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1296
1297        {
1298        }
1299        else
1300        {
1301            tr_peer * peer = getExistingPeer( t, addr );
1302
1303            if( peer ) /* we already have this peer */
1304            {
1305            }
1306            else
1307            {
1308                peer = getPeer( t, addr );
1309                tr_free( peer->client );
1310
1311                if( !peer_id )
1312                    peer->client = NULL;
1313                else {
1314                    char client[128];
1315                    tr_clientForId( client, sizeof( client ), peer_id );
1316                    peer->client = tr_strdup( client );
1317                }
1318
1319                peer->port = port;
1320                peer->atom = atom;
1321                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1322                                                                balanced by our unref in peerDestructor()  */
1323                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1324                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1325
1326                success = TRUE;
1327            }
1328        }
1329    }
1330
1331    if( t )
1332        torrentUnlock( t );
1333
1334    return success;
1335}
1336
1337void
1338tr_peerMgrAddIncoming( tr_peerMgr * manager,
1339                       tr_address * addr,
1340                       tr_port      port,
1341                       int          socket )
1342{
1343    managerLock( manager );
1344
1345    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1346    {
1347        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1348        tr_netClose( socket );
1349    }
1350    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1351    {
1352        tr_netClose( socket );
1353    }
1354    else /* we don't have a connetion to them yet... */
1355    {
1356        tr_peerIo *    io;
1357        tr_handshake * handshake;
1358
1359        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1360
1361        handshake = tr_handshakeNew( io,
1362                                     manager->session->encryptionMode,
1363                                     myHandshakeDoneCB,
1364                                     manager );
1365
1366        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1367
1368        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1369                                 handshakeCompare );
1370    }
1371
1372    managerUnlock( manager );
1373}
1374
1375static tr_bool
1376tr_isPex( const tr_pex * pex )
1377{
1378    return pex && tr_isAddress( &pex->addr );
1379}
1380
1381void
1382tr_peerMgrAddPex( tr_torrent   *  tor,
1383                  uint8_t         from,
1384                  const tr_pex *  pex )
1385{
1386    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1387    {
1388        Torrent * t = tor->torrentPeers;
1389        managerLock( t->manager );
1390
1391        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1392            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1393                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1394
1395        managerUnlock( t->manager );
1396    }
1397}
1398
1399tr_pex *
1400tr_peerMgrCompactToPex( const void *    compact,
1401                        size_t          compactLen,
1402                        const uint8_t * added_f,
1403                        size_t          added_f_len,
1404                        size_t *        pexCount )
1405{
1406    size_t          i;
1407    size_t          n = compactLen / 6;
1408    const uint8_t * walk = compact;
1409    tr_pex *        pex = tr_new0( tr_pex, n );
1410
1411    for( i = 0; i < n; ++i )
1412    {
1413        pex[i].addr.type = TR_AF_INET;
1414        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1415        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1416        if( added_f && ( n == added_f_len ) )
1417            pex[i].flags = added_f[i];
1418    }
1419
1420    *pexCount = n;
1421    return pex;
1422}
1423
1424tr_pex *
1425tr_peerMgrCompact6ToPex( const void    * compact,
1426                         size_t          compactLen,
1427                         const uint8_t * added_f,
1428                         size_t          added_f_len,
1429                         size_t        * pexCount )
1430{
1431    size_t          i;
1432    size_t          n = compactLen / 18;
1433    const uint8_t * walk = compact;
1434    tr_pex *        pex = tr_new0( tr_pex, n );
1435
1436    for( i = 0; i < n; ++i )
1437    {
1438        pex[i].addr.type = TR_AF_INET6;
1439        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1440        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1441        if( added_f && ( n == added_f_len ) )
1442            pex[i].flags = added_f[i];
1443    }
1444
1445    *pexCount = n;
1446    return pex;
1447}
1448
1449tr_pex *
1450tr_peerMgrArrayToPex( const void * array,
1451                      size_t       arrayLen,
1452                      size_t      * pexCount )
1453{
1454    size_t          i;
1455    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1456    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1457    const uint8_t * walk = array;
1458    tr_pex        * pex = tr_new0( tr_pex, n );
1459
1460    for( i = 0 ; i < n ; i++ ) {
1461        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1462        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1463        pex[i].flags = 0x00;
1464        walk += sizeof( tr_address ) + 2;
1465    }
1466
1467    *pexCount = n;
1468    return pex;
1469}
1470
1471/**
1472***
1473**/
1474
1475void
1476tr_peerMgrSetBlame( tr_torrent     * tor,
1477                    tr_piece_index_t pieceIndex,
1478                    int              success )
1479{
1480    if( !success )
1481    {
1482        int        peerCount, i;
1483        Torrent *  t = tor->torrentPeers;
1484        tr_peer ** peers;
1485
1486        assert( torrentIsLocked( t ) );
1487
1488        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1489        for( i = 0; i < peerCount; ++i )
1490        {
1491            tr_peer * peer = peers[i];
1492            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1493            {
1494                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1495                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1496                        pieceIndex, (int)peer->strikes + 1 );
1497                addStrike( t, peer );
1498            }
1499        }
1500    }
1501}
1502
1503int
1504tr_pexCompare( const void * va, const void * vb )
1505{
1506    const tr_pex * a = va;
1507    const tr_pex * b = vb;
1508    int i;
1509
1510    assert( tr_isPex( a ) );
1511    assert( tr_isPex( b ) );
1512
1513    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1514        return i;
1515
1516    if( a->port != b->port )
1517        return a->port < b->port ? -1 : 1;
1518
1519    return 0;
1520}
1521
1522#if 0
1523static int
1524peerPrefersCrypto( const tr_peer * peer )
1525{
1526    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1527        return TRUE;
1528
1529    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1530        return FALSE;
1531
1532    return tr_peerIoIsEncrypted( peer->io );
1533}
1534#endif
1535
1536/* better goes first */
1537static int
1538compareAtomsByUsefulness( const void * va, const void *vb )
1539{
1540    const struct peer_atom * a = * (const struct peer_atom**) va;
1541    const struct peer_atom * b = * (const struct peer_atom**) vb;
1542
1543    assert( tr_isAtom( a ) );
1544    assert( tr_isAtom( b ) );
1545
1546    if( a->piece_data_time != b->piece_data_time )
1547        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1548    if( a->from != b->from )
1549        return a->from < b->from ? -1 : 1;
1550    if( a->numFails != b->numFails )
1551        return a->numFails < b->numFails ? -1 : 1;
1552
1553    return 0;
1554}
1555
1556int
1557tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af, int maxPeerCount )
1558{
1559    int count = 0;
1560    const Torrent * t = tor->torrentPeers;
1561
1562    managerLock( t->manager );
1563
1564    {
1565        int i;
1566        const int atomCount = tr_ptrArraySize( &t->pool );
1567        const int pexCount = MIN( atomCount, maxPeerCount );
1568        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1569        struct peer_atom ** atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1570        /* for now, this will waste memory on torrents that have both
1571         * ipv6 and ipv4 peers */
1572        tr_pex * pex = tr_new0( tr_pex, atomCount );
1573        tr_pex * walk = pex;
1574
1575        qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1576
1577        for( i=0; i<atomCount && count<pexCount; ++i )
1578        {
1579            const struct peer_atom * atom = atoms[i];
1580            if( atom->addr.type == af )
1581            {
1582                assert( tr_isAddress( &atom->addr ) );
1583                walk->addr = atom->addr;
1584                walk->port = atom->port;
1585                walk->flags = atom->flags;
1586                ++count;
1587                ++walk;
1588            }
1589        }
1590
1591        assert( ( walk - pex ) == count );
1592        *setme_pex = pex;
1593
1594        tr_free( atoms );
1595    }
1596
1597    managerUnlock( t->manager );
1598    return count;
1599}
1600
1601static void
1602ensureMgrTimersExist( struct tr_peerMgr * m )
1603{
1604    tr_session * s = m->session;
1605
1606    if( m->bandwidthTimer == NULL )
1607        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1608
1609    if( m->rechokeTimer == NULL )
1610        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1611
1612    if( m->reconnectTimer == NULL )
1613        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1614
1615    if( m->refillUpkeepTimer == NULL )
1616        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1617}
1618
1619void
1620tr_peerMgrStartTorrent( tr_torrent * tor )
1621{
1622    Torrent * t = tor->torrentPeers;
1623
1624    assert( t != NULL );
1625    managerLock( t->manager );
1626    ensureMgrTimersExist( t->manager );
1627
1628    if( !t->isRunning )
1629    {
1630        t->isRunning = TRUE;
1631
1632        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1633            refillSoon( t );
1634    }
1635
1636    rechokePulse( t->manager );
1637    managerUnlock( t->manager );
1638}
1639
1640static void
1641stopTorrent( Torrent * t )
1642{
1643    assert( torrentIsLocked( t ) );
1644
1645    t->isRunning = FALSE;
1646
1647    /* disconnect the peers. */
1648    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1649    tr_ptrArrayClear( &t->peers );
1650
1651    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1652     * which removes the handshake from t->outgoingHandshakes... */
1653    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1654        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1655}
1656
1657void
1658tr_peerMgrStopTorrent( tr_torrent * tor )
1659{
1660    Torrent * t = tor->torrentPeers;
1661
1662    managerLock( t->manager );
1663
1664    stopTorrent( t );
1665
1666    managerUnlock( t->manager );
1667}
1668
1669void
1670tr_peerMgrAddTorrent( tr_peerMgr * manager,
1671                      tr_torrent * tor )
1672{
1673    managerLock( manager );
1674
1675    assert( tor );
1676    assert( tor->torrentPeers == NULL );
1677
1678    tor->torrentPeers = torrentConstructor( manager, tor );
1679
1680    managerUnlock( manager );
1681}
1682
1683void
1684tr_peerMgrRemoveTorrent( tr_torrent * tor )
1685{
1686    tr_torrentLock( tor );
1687
1688    stopTorrent( tor->torrentPeers );
1689    torrentDestructor( tor->torrentPeers );
1690
1691    tr_torrentUnlock( tor );
1692}
1693
1694void
1695tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1696                               int8_t           * tab,
1697                               unsigned int       tabCount )
1698{
1699    tr_piece_index_t   i;
1700    const Torrent *    t;
1701    float              interval;
1702    tr_bool            isSeed;
1703    int                peerCount;
1704    const tr_peer **   peers;
1705    tr_torrentLock( tor );
1706
1707    t = tor->torrentPeers;
1708    tor = t->tor;
1709    interval = tor->info.pieceCount / (float)tabCount;
1710    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1711    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1712    peerCount = tr_ptrArraySize( &t->peers );
1713
1714    memset( tab, 0, tabCount );
1715
1716    for( i = 0; tor && i < tabCount; ++i )
1717    {
1718        const int piece = i * interval;
1719
1720        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1721            tab[i] = -1;
1722        else if( peerCount ) {
1723            int j;
1724            for( j = 0; j < peerCount; ++j )
1725                if( tr_bitfieldHas( peers[j]->have, i ) )
1726                    ++tab[i];
1727        }
1728    }
1729
1730    tr_torrentUnlock( tor );
1731}
1732
1733/* Returns the pieces that are available from peers */
1734tr_bitfield*
1735tr_peerMgrGetAvailable( const tr_torrent * tor )
1736{
1737    int i;
1738    int peerCount;
1739    Torrent * t = tor->torrentPeers;
1740    const tr_peer ** peers;
1741    tr_bitfield * pieces;
1742    managerLock( t->manager );
1743
1744    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1745    peerCount = tr_ptrArraySize( &t->peers );
1746    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1747    for( i=0; i<peerCount; ++i )
1748        tr_bitfieldOr( pieces, peers[i]->have );
1749
1750    managerUnlock( t->manager );
1751    return pieces;
1752}
1753
1754void
1755tr_peerMgrTorrentStats( tr_torrent       * tor,
1756                        int              * setmePeersKnown,
1757                        int              * setmePeersConnected,
1758                        int              * setmeSeedsConnected,
1759                        int              * setmeWebseedsSendingToUs,
1760                        int              * setmePeersSendingToUs,
1761                        int              * setmePeersGettingFromUs,
1762                        int              * setmePeersFrom )
1763{
1764    int i, size;
1765    const Torrent * t = tor->torrentPeers;
1766    const tr_peer ** peers;
1767    const tr_webseed ** webseeds;
1768
1769    managerLock( t->manager );
1770
1771    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1772    size = tr_ptrArraySize( &t->peers );
1773
1774    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1775    *setmePeersConnected       = 0;
1776    *setmeSeedsConnected       = 0;
1777    *setmePeersGettingFromUs   = 0;
1778    *setmePeersSendingToUs     = 0;
1779    *setmeWebseedsSendingToUs  = 0;
1780
1781    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1782        setmePeersFrom[i] = 0;
1783
1784    for( i=0; i<size; ++i )
1785    {
1786        const tr_peer * peer = peers[i];
1787        const struct peer_atom * atom = peer->atom;
1788
1789        if( peer->io == NULL ) /* not connected */
1790            continue;
1791
1792        ++*setmePeersConnected;
1793
1794        ++setmePeersFrom[atom->from];
1795
1796        if( clientIsDownloadingFrom( peer ) )
1797            ++*setmePeersSendingToUs;
1798
1799        if( clientIsUploadingTo( peer ) )
1800            ++*setmePeersGettingFromUs;
1801
1802        if( atom->flags & ADDED_F_SEED_FLAG )
1803            ++*setmeSeedsConnected;
1804    }
1805
1806    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1807    size = tr_ptrArraySize( &t->webseeds );
1808    for( i=0; i<size; ++i )
1809        if( tr_webseedIsActive( webseeds[i] ) )
1810            ++*setmeWebseedsSendingToUs;
1811
1812    managerUnlock( t->manager );
1813}
1814
1815float
1816tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1817{
1818    int i;
1819    float tmp;
1820    float ret = 0;
1821
1822    const Torrent * t = tor->torrentPeers;
1823    const int n = tr_ptrArraySize( &t->webseeds );
1824    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1825
1826    for( i=0; i<n; ++i )
1827        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1828            ret += tmp;
1829
1830    return ret;
1831}
1832
1833
1834float*
1835tr_peerMgrWebSpeeds( const tr_torrent * tor )
1836{
1837    const Torrent * t = tor->torrentPeers;
1838    const tr_webseed ** webseeds;
1839    int i;
1840    int webseedCount;
1841    float * ret;
1842    uint64_t now;
1843
1844    assert( t->manager );
1845    managerLock( t->manager );
1846
1847    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1848    webseedCount = tr_ptrArraySize( &t->webseeds );
1849    assert( webseedCount == tor->info.webseedCount );
1850    ret = tr_new0( float, webseedCount );
1851    now = tr_date( );
1852
1853    for( i=0; i<webseedCount; ++i )
1854        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1855            ret[i] = -1.0;
1856
1857    managerUnlock( t->manager );
1858    return ret;
1859}
1860
1861double
1862tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1863{
1864    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1865}
1866
1867
1868struct tr_peer_stat *
1869tr_peerMgrPeerStats( const tr_torrent    * tor,
1870                     int                 * setmeCount )
1871{
1872    int i, size;
1873    const Torrent * t = tor->torrentPeers;
1874    const tr_peer ** peers;
1875    tr_peer_stat * ret;
1876    uint64_t now;
1877
1878    assert( t->manager );
1879    managerLock( t->manager );
1880
1881    size = tr_ptrArraySize( &t->peers );
1882    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1883    ret = tr_new0( tr_peer_stat, size );
1884    now = tr_date( );
1885
1886    for( i=0; i<size; ++i )
1887    {
1888        char *                   pch;
1889        const tr_peer *          peer = peers[i];
1890        const struct peer_atom * atom = peer->atom;
1891        tr_peer_stat *           stat = ret + i;
1892
1893        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1894        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1895                   sizeof( stat->client ) );
1896        stat->port               = ntohs( peer->port );
1897        stat->from               = atom->from;
1898        stat->progress           = peer->progress;
1899        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1900        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1901        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1902        stat->peerIsChoked       = peer->peerIsChoked;
1903        stat->peerIsInterested   = peer->peerIsInterested;
1904        stat->clientIsChoked     = peer->clientIsChoked;
1905        stat->clientIsInterested = peer->clientIsInterested;
1906        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1907        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1908        stat->isUploadingTo      = clientIsUploadingTo( peer );
1909        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1910
1911        pch = stat->flagStr;
1912        if( t->optimistic == peer ) *pch++ = 'O';
1913        if( stat->isDownloadingFrom ) *pch++ = 'D';
1914        else if( stat->clientIsInterested ) *pch++ = 'd';
1915        if( stat->isUploadingTo ) *pch++ = 'U';
1916        else if( stat->peerIsInterested ) *pch++ = 'u';
1917        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1918        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1919        if( stat->isEncrypted ) *pch++ = 'E';
1920        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1921        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1922        if( stat->isIncoming ) *pch++ = 'I';
1923        *pch = '\0';
1924    }
1925
1926    *setmeCount = size;
1927
1928    managerUnlock( t->manager );
1929    return ret;
1930}
1931
1932/**
1933***
1934**/
1935
1936struct ChokeData
1937{
1938    tr_bool         doUnchoke;
1939    tr_bool         isInterested;
1940    tr_bool         isChoked;
1941    int             rate;
1942    tr_peer *       peer;
1943};
1944
1945static int
1946compareChoke( const void * va,
1947              const void * vb )
1948{
1949    const struct ChokeData * a = va;
1950    const struct ChokeData * b = vb;
1951
1952    if( a->rate != b->rate ) /* prefer higher overall speeds */
1953        return a->rate > b->rate ? -1 : 1;
1954
1955    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1956        return a->isChoked ? 1 : -1;
1957
1958    return 0;
1959}
1960
1961static int
1962isNew( const tr_peer * peer )
1963{
1964    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1965}
1966
1967static int
1968isSame( const tr_peer * peer )
1969{
1970    return peer && peer->client && strstr( peer->client, "Transmission" );
1971}
1972
1973/**
1974***
1975**/
1976
1977static void
1978rechokeTorrent( Torrent * t, const uint64_t now )
1979{
1980    int i, size, unchokedInterested;
1981    const int peerCount = tr_ptrArraySize( &t->peers );
1982    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1983    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1984    const tr_session * session = t->manager->session;
1985    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1986
1987    assert( torrentIsLocked( t ) );
1988
1989    /* sort the peers by preference and rate */
1990    for( i = 0, size = 0; i < peerCount; ++i )
1991    {
1992        tr_peer * peer = peers[i];
1993        struct peer_atom * atom = peer->atom;
1994
1995        if( peer->progress >= 1.0 ) /* choke all seeds */
1996        {
1997            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1998        }
1999        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2000        {
2001            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2002        }
2003        else if( chokeAll ) /* choke everyone if we're not uploading */
2004        {
2005            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2006        }
2007        else
2008        {
2009            struct ChokeData * n = &choke[size++];
2010            n->peer         = peer;
2011            n->isInterested = peer->peerIsInterested;
2012            n->isChoked     = peer->peerIsChoked;
2013            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2014        }
2015    }
2016
2017    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2018
2019    /**
2020     * Reciprocation and number of uploads capping is managed by unchoking
2021     * the N peers which have the best upload rate and are interested.
2022     * This maximizes the client's download rate. These N peers are
2023     * referred to as downloaders, because they are interested in downloading
2024     * from the client.
2025     *
2026     * Peers which have a better upload rate (as compared to the downloaders)
2027     * but aren't interested get unchoked. If they become interested, the
2028     * downloader with the worst upload rate gets choked. If a client has
2029     * a complete file, it uses its upload rate rather than its download
2030     * rate to decide which peers to unchoke.
2031     */
2032    unchokedInterested = 0;
2033    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2034        choke[i].doUnchoke = 1;
2035        if( choke[i].isInterested )
2036            ++unchokedInterested;
2037    }
2038
2039    /* optimistic unchoke */
2040    if( i < size )
2041    {
2042        int n;
2043        struct ChokeData * c;
2044        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2045
2046        for( ; i<size; ++i )
2047        {
2048            if( choke[i].isInterested )
2049            {
2050                const tr_peer * peer = choke[i].peer;
2051                int x = 1, y;
2052                if( isNew( peer ) ) x *= 3;
2053                if( isSame( peer ) ) x *= 3;
2054                for( y=0; y<x; ++y )
2055                    tr_ptrArrayAppend( &randPool, &choke[i] );
2056            }
2057        }
2058
2059        if(( n = tr_ptrArraySize( &randPool )))
2060        {
2061            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2062            c->doUnchoke = 1;
2063            t->optimistic = c->peer;
2064        }
2065
2066        tr_ptrArrayDestruct( &randPool, NULL );
2067    }
2068
2069    for( i=0; i<size; ++i )
2070        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2071
2072    /* cleanup */
2073    tr_free( choke );
2074}
2075
2076static int
2077rechokePulse( void * vmgr )
2078{
2079    uint64_t now;
2080    tr_torrent * tor = NULL;
2081    tr_peerMgr * mgr = vmgr;
2082    managerLock( mgr );
2083
2084    now = tr_date( );
2085    while(( tor = tr_torrentNext( mgr->session, tor )))
2086        if( tor->isRunning )
2087            rechokeTorrent( tor->torrentPeers, now );
2088
2089    managerUnlock( mgr );
2090    return TRUE;
2091}
2092
2093/***
2094****
2095****  Life and Death
2096****
2097***/
2098
2099typedef enum
2100{
2101    TR_CAN_KEEP,
2102    TR_CAN_CLOSE,
2103    TR_MUST_CLOSE,
2104}
2105tr_close_type_t;
2106
2107static tr_close_type_t
2108shouldPeerBeClosed( const Torrent    * t,
2109                    const tr_peer    * peer,
2110                    int                peerCount )
2111{
2112    const tr_torrent *       tor = t->tor;
2113    const time_t             now = time( NULL );
2114    const struct peer_atom * atom = peer->atom;
2115
2116    /* if it's marked for purging, close it */
2117    if( peer->doPurge )
2118    {
2119        tordbg( t, "purging peer %s because its doPurge flag is set",
2120                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2121        return TR_MUST_CLOSE;
2122    }
2123
2124    /* if we're seeding and the peer has everything we have,
2125     * and enough time has passed for a pex exchange, then disconnect */
2126    if( tr_torrentIsSeed( tor ) )
2127    {
2128        int peerHasEverything;
2129        if( atom->flags & ADDED_F_SEED_FLAG )
2130            peerHasEverything = TRUE;
2131        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2132            peerHasEverything = FALSE;
2133        else {
2134            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2135            tr_bitfieldDifference( tmp, peer->have );
2136            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2137            tr_bitfieldFree( tmp );
2138        }
2139
2140        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2141        {
2142            tordbg( t, "purging peer %s because we're both seeds",
2143                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2144            return TR_MUST_CLOSE;
2145        }
2146    }
2147
2148    /* disconnect if it's been too long since piece data has been transferred.
2149     * this is on a sliding scale based on number of available peers... */
2150    {
2151        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2152        /* if we have >= relaxIfFewerThan, strictness is 100%.
2153         * if we have zero connections, strictness is 0% */
2154        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2155                               ? 1.0
2156                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2157        const int lo = MIN_UPLOAD_IDLE_SECS;
2158        const int hi = MAX_UPLOAD_IDLE_SECS;
2159        const int limit = hi - ( ( hi - lo ) * strictness );
2160        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2161/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2162        if( idleTime > limit ) {
2163            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2164                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2165            return TR_CAN_CLOSE;
2166        }
2167    }
2168
2169    return TR_CAN_KEEP;
2170}
2171
2172static tr_peer **
2173getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2174{
2175    int i, peerCount, outsize;
2176    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2177    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2178
2179    assert( torrentIsLocked( t ) );
2180
2181    for( i = outsize = 0; i < peerCount; ++i )
2182        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2183            ret[outsize++] = peers[i];
2184
2185    *setmeSize = outsize;
2186    return ret;
2187}
2188
2189static int
2190compareCandidates( const void * va, const void * vb )
2191{
2192    const struct peer_atom * a = *(const struct peer_atom**) va;
2193    const struct peer_atom * b = *(const struct peer_atom**) vb;
2194
2195    /* <Charles> Here we would probably want to try reconnecting to
2196     * peers that had most recently given us data. Lots of users have
2197     * trouble with resets due to their routers and/or ISPs. This way we
2198     * can quickly recover from an unwanted reset. So we sort
2199     * piece_data_time in descending order.
2200     */
2201
2202    if( a->piece_data_time != b->piece_data_time )
2203        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2204
2205    if( a->numFails != b->numFails )
2206        return a->numFails < b->numFails ? -1 : 1;
2207
2208    if( a->time != b->time )
2209        return a->time < b->time ? -1 : 1;
2210
2211    /* In order to avoid fragmenting the swarm, peers from trackers and
2212     * from the DHT should be preferred to peers from PEX. */
2213    if( a->from != b->from )
2214        return a->from < b->from ? -1 : 1;
2215
2216    return 0;
2217}
2218
2219static int
2220getReconnectIntervalSecs( const struct peer_atom * atom )
2221{
2222    int          sec;
2223    const time_t now = time( NULL );
2224
2225    /* if we were recently connected to this peer and transferring piece
2226     * data, try to reconnect to them sooner rather that later -- we don't
2227     * want network troubles to get in the way of a good peer. */
2228    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2229        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2230
2231    /* don't allow reconnects more often than our minimum */
2232    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2233        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2234
2235    /* otherwise, the interval depends on how many times we've tried
2236     * and failed to connect to the peer */
2237    else switch( atom->numFails ) {
2238        case 0: sec = 0; break;
2239        case 1: sec = 5; break;
2240        case 2: sec = 2 * 60; break;
2241        case 3: sec = 15 * 60; break;
2242        case 4: sec = 30 * 60; break;
2243        case 5: sec = 60 * 60; break;
2244        default: sec = 120 * 60; break;
2245    }
2246
2247    return sec;
2248}
2249
2250static struct peer_atom **
2251getPeerCandidates( Torrent * t, int * setmeSize )
2252{
2253    int                 i, atomCount, retCount;
2254    struct peer_atom ** atoms;
2255    struct peer_atom ** ret;
2256    const time_t        now = time( NULL );
2257    const int           seed = tr_torrentIsSeed( t->tor );
2258
2259    assert( torrentIsLocked( t ) );
2260
2261    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2262    ret = tr_new( struct peer_atom*, atomCount );
2263    for( i = retCount = 0; i < atomCount; ++i )
2264    {
2265        int                interval;
2266        struct peer_atom * atom = atoms[i];
2267
2268        /* peer fed us too much bad data ... we only keep it around
2269         * now to weed it out in case someone sends it to us via pex */
2270        if( atom->myflags & MYFLAG_BANNED )
2271            continue;
2272
2273        /* peer was unconnectable before, so we're not going to keep trying.
2274         * this is needs a separate flag from `banned', since if they try
2275         * to connect to us later, we'll let them in */
2276        if( atom->myflags & MYFLAG_UNREACHABLE )
2277            continue;
2278
2279        /* we don't need two connections to the same peer... */
2280        if( peerIsInUse( t, &atom->addr ) )
2281            continue;
2282
2283        /* no need to connect if we're both seeds... */
2284        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2285                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2286            continue;
2287
2288        /* don't reconnect too often */
2289        interval = getReconnectIntervalSecs( atom );
2290        if( ( now - atom->time ) < interval )
2291        {
2292            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2293                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2294            continue;
2295        }
2296
2297        /* Don't connect to peers in our blocklist */
2298        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2299            continue;
2300
2301        ret[retCount++] = atom;
2302    }
2303
2304    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2305    *setmeSize = retCount;
2306    return ret;
2307}
2308
2309static void
2310closePeer( Torrent * t, tr_peer * peer )
2311{
2312    struct peer_atom * atom;
2313
2314    assert( t != NULL );
2315    assert( peer != NULL );
2316
2317    atom = peer->atom;
2318
2319    /* if we transferred piece data, then they might be good peers,
2320       so reset their `numFails' weight to zero.  otherwise we connected
2321       to them fruitlessly, so mark it as another fail */
2322    if( atom->piece_data_time )
2323        atom->numFails = 0;
2324    else
2325        ++atom->numFails;
2326
2327    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2328    removePeer( t, peer );
2329}
2330
2331static void
2332reconnectTorrent( Torrent * t )
2333{
2334    static time_t prevTime = 0;
2335    static int    newConnectionsThisSecond = 0;
2336    time_t        now;
2337
2338    now = time( NULL );
2339    if( prevTime != now )
2340    {
2341        prevTime = now;
2342        newConnectionsThisSecond = 0;
2343    }
2344
2345    if( !t->isRunning )
2346    {
2347        removeAllPeers( t );
2348    }
2349    else
2350    {
2351        int i;
2352        int canCloseCount;
2353        int mustCloseCount;
2354        int candidateCount;
2355        int maxCandidates;
2356        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2357        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2358        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2359
2360        tordbg( t, "reconnect pulse for [%s]: "
2361                   "%d must-close connections, "
2362                   "%d can-close connections, "
2363                   "%d connection candidates, "
2364                   "%d atoms, "
2365                   "max per pulse is %d",
2366                   t->tor->info.name,
2367                   mustCloseCount,
2368                   canCloseCount,
2369                   candidateCount,
2370                   tr_ptrArraySize( &t->pool ),
2371                   MAX_RECONNECTIONS_PER_PULSE );
2372
2373        /* disconnect the really bad peers */
2374        for( i=0; i<mustCloseCount; ++i )
2375            closePeer( t, mustClose[i] );
2376
2377        /* decide how many peers can we try to add in this pass */
2378        maxCandidates = candidateCount;
2379        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2380        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2381        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2382
2383        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2384        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2385            closePeer( t, canClose[i] );
2386
2387        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2388                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2389                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2390                   candidateCount,
2391                   MAX_RECONNECTIONS_PER_PULSE,
2392                   getPeerCount( t ),
2393                   getMaxPeerCount( t->tor ),
2394                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2395
2396        /* add some new ones */
2397        for( i=0; i<maxCandidates; ++i )
2398        {
2399            tr_peerMgr        * mgr = t->manager;
2400            struct peer_atom  * atom = candidates[i];
2401            tr_peerIo         * io;
2402
2403            tordbg( t, "Starting an OUTGOING connection with %s",
2404                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2405
2406            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2407
2408            if( io == NULL )
2409            {
2410                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2411                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2412                atom->myflags |= MYFLAG_UNREACHABLE;
2413            }
2414            else
2415            {
2416                tr_handshake * handshake = tr_handshakeNew( io,
2417                                                            mgr->session->encryptionMode,
2418                                                            myHandshakeDoneCB,
2419                                                            mgr );
2420
2421                assert( tr_peerIoGetTorrentHash( io ) );
2422
2423                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2424
2425                ++newConnectionsThisSecond;
2426
2427                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2428                                         handshakeCompare );
2429            }
2430
2431            atom->time = time( NULL );
2432        }
2433
2434        /* cleanup */
2435        tr_free( candidates );
2436        tr_free( mustClose );
2437        tr_free( canClose );
2438    }
2439}
2440
2441struct peer_liveliness
2442{
2443    tr_peer * peer;
2444    void * clientData;
2445    time_t pieceDataTime;
2446    time_t time;
2447    int speed;
2448    tr_bool doPurge;
2449};
2450
2451static int
2452comparePeerLiveliness( const void * va, const void * vb )
2453{
2454    const struct peer_liveliness * a = va;
2455    const struct peer_liveliness * b = vb;
2456
2457    if( a->doPurge != b->doPurge )
2458        return a->doPurge ? 1 : -1;
2459
2460    if( a->speed != b->speed ) /* faster goes first */
2461        return a->speed > b->speed ? -1 : 1;
2462
2463    /* the one to give us data more recently goes first */
2464    if( a->pieceDataTime != b->pieceDataTime )
2465        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2466
2467    /* the one we connected to most recently goes first */
2468    if( a->time != b->time )
2469        return a->time > b->time ? -1 : 1;
2470
2471    return 0;
2472}
2473
2474/* FIXME: getPeersToClose() should use this */
2475static void
2476sortPeersByLiveliness( tr_peer ** peers, void** clientData, int n, uint64_t now )
2477{
2478    int i;
2479    struct peer_liveliness *lives, *l;
2480
2481    /* build a sortable array of peer + extra info */
2482    lives = l = tr_new0( struct peer_liveliness, n );
2483    for( i=0; i<n; ++i, ++l ) {
2484        tr_peer * p = peers[i];
2485        l->peer = p;
2486        l->doPurge = p->doPurge;
2487        l->pieceDataTime = p->atom->piece_data_time;
2488        l->time = p->atom->time;
2489        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2490                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2491        if( clientData )
2492            l->clientData = clientData[i];
2493    }
2494
2495    /* sort 'em */
2496    assert( n == ( l - lives ) );
2497    qsort( lives, n, sizeof( struct peer_liveliness ), comparePeerLiveliness );
2498
2499    /* build the peer array */
2500    for( i=0, l=lives; i<n; ++i, ++l ) {
2501        peers[i] = l->peer;
2502        if( clientData )
2503            clientData[i] = l->clientData;
2504    }
2505    assert( n == ( l - lives ) );
2506
2507    /* cleanup */
2508    tr_free( lives );
2509}
2510
2511static void
2512enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2513{
2514    int n = tr_ptrArraySize( &t->peers );
2515    const int max = tr_torrentGetPeerLimit( t->tor );
2516    if( n > max )
2517    {
2518        void * base = tr_ptrArrayBase( &t->peers );
2519        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2520        sortPeersByLiveliness( peers, NULL, n, now );
2521        while( n > max )
2522            closePeer( t, peers[--n] );
2523        tr_free( peers );
2524    }
2525}
2526
2527static void
2528enforceSessionPeerLimit( tr_session * session, uint64_t now )
2529{
2530    int n = 0;
2531    tr_torrent * tor = NULL;
2532    const int max = tr_sessionGetPeerLimit( session );
2533
2534    /* count the total number of peers */
2535    while(( tor = tr_torrentNext( session, tor )))
2536        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2537
2538    /* if there are too many, prune out the worst */
2539    if( n > max )
2540    {
2541        tr_peer ** peers = tr_new( tr_peer*, n );
2542        Torrent ** torrents = tr_new( Torrent*, n );
2543
2544        /* populate the peer array */
2545        n = 0;
2546        tor = NULL;
2547        while(( tor = tr_torrentNext( session, tor ))) {
2548            int i;
2549            Torrent * t = tor->torrentPeers;
2550            const int tn = tr_ptrArraySize( &t->peers );
2551            for( i=0; i<tn; ++i, ++n ) {
2552                peers[n] = tr_ptrArrayNth( &t->peers, i );
2553                torrents[n] = t;
2554            }
2555        }
2556
2557        /* sort 'em */
2558        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2559
2560        /* cull out the crappiest */
2561        while( n-- > max )
2562            closePeer( torrents[n], peers[n] );
2563
2564        /* cleanup */
2565        tr_free( torrents );
2566        tr_free( peers );
2567    }
2568}
2569
2570
2571static int
2572reconnectPulse( void * vmgr )
2573{
2574    tr_torrent * tor;
2575    tr_peerMgr * mgr = vmgr;
2576    uint64_t now;
2577    managerLock( mgr );
2578
2579    now = tr_date( );
2580
2581    /* if we're over the per-torrent peer limits, cull some peers */
2582    tor = NULL;
2583    while(( tor = tr_torrentNext( mgr->session, tor )))
2584        if( tor->isRunning )
2585            enforceTorrentPeerLimit( tor->torrentPeers, now );
2586
2587    /* if we're over the per-session peer limits, cull some peers */
2588    enforceSessionPeerLimit( mgr->session, now );
2589
2590    tor = NULL;
2591    while(( tor = tr_torrentNext( mgr->session, tor )))
2592        if( tor->isRunning )
2593            reconnectTorrent( tor->torrentPeers );
2594
2595    managerUnlock( mgr );
2596    return TRUE;
2597}
2598
2599/****
2600*****
2601*****  BANDWIDTH ALLOCATION
2602*****
2603****/
2604
2605static void
2606pumpAllPeers( tr_peerMgr * mgr )
2607{
2608    tr_torrent * tor = NULL;
2609
2610    while(( tor = tr_torrentNext( mgr->session, tor )))
2611    {
2612        int j;
2613        Torrent * t = tor->torrentPeers;
2614
2615        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2616        {
2617            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2618            tr_peerMsgsPulse( peer->msgs );
2619        }
2620    }
2621}
2622
2623static int
2624bandwidthPulse( void * vmgr )
2625{
2626    tr_torrent * tor = NULL;
2627    tr_peerMgr * mgr = vmgr;
2628    managerLock( mgr );
2629
2630    /* FIXME: this next line probably isn't necessary... */
2631    pumpAllPeers( mgr );
2632
2633    /* allocate bandwidth to the peers */
2634    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2635    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2636
2637    /* possibly stop torrents that have seeded enough */
2638    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2639        if( tor->needsSeedRatioCheck ) {
2640            tor->needsSeedRatioCheck = FALSE;
2641            tr_torrentCheckSeedRatio( tor );
2642        }
2643    }
2644
2645    /* possibly stop torrents that have an error */
2646    tor = NULL;
2647    while(( tor = tr_torrentNext( mgr->session, tor )))
2648        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR )) 
2649            tr_torrentStop( tor );
2650
2651    managerUnlock( mgr );
2652    return TRUE;
2653}
Note: See TracBrowser for help on using the repository browser.