source: trunk/libtransmission/peer-mgr.c @ 7353

Last change on this file since 7353 was 7353, checked in by charles, 12 years ago

(trunk libT) fix bug which caused libtransmission to hold onto nonproductive peers for longer than it should've

  • Property svn:keywords set to Date Rev Author Id
File size: 64.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7353 2008-12-11 07:04:46Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/* We keep one of these for every peer we know about, whether
100 * it's connected or not, so the struct must be small.
101 * When our current connections underperform, we dip back
102 * into this list for new ones. */
103struct peer_atom
104{
105    uint8_t     from;
106    uint8_t     flags;       /* these match the added_f flags */
107    uint8_t     myflags;     /* flags that aren't defined in added_f */
108    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
109    uint8_t     partialSeed;
110    tr_port     port;
111    uint16_t    numFails;
112    tr_address  addr;
113    time_t      time;        /* when the peer's connection status last changed */
114    time_t      piece_data_time;
115};
116
117typedef struct
118{
119    tr_bool         isRunning;
120
121    uint8_t         hash[SHA_DIGEST_LENGTH];
122    int         *   pendingRequestCount;
123    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
124    tr_ptrArray *   pool; /* struct peer_atom */
125    tr_ptrArray *   peers; /* tr_peer */
126    tr_ptrArray *   webseeds; /* tr_webseed */
127    tr_timer *      reconnectTimer;
128    tr_timer *      rechokeTimer;
129    tr_timer *      refillTimer;
130    tr_torrent *    tor;
131    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
132
133    struct tr_peerMgr * manager;
134}
135Torrent;
136
137struct tr_peerMgr
138{
139    tr_session      * session;
140    tr_ptrArray     * torrents; /* Torrent */
141    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
142    tr_timer        * bandwidthTimer;
143};
144
145#define tordbg( t, ... ) \
146    do { \
147        if( tr_deepLoggingIsActive( ) ) \
148            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
149    } while( 0 )
150
151#define dbgmsg( ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
155    } while( 0 )
156
157/**
158***
159**/
160
161static void
162managerLock( const struct tr_peerMgr * manager )
163{
164    tr_globalLock( manager->session );
165}
166
167static void
168managerUnlock( const struct tr_peerMgr * manager )
169{
170    tr_globalUnlock( manager->session );
171}
172
173static void
174torrentLock( Torrent * torrent )
175{
176    managerLock( torrent->manager );
177}
178
179static void
180torrentUnlock( Torrent * torrent )
181{
182    managerUnlock( torrent->manager );
183}
184
185static int
186torrentIsLocked( const Torrent * t )
187{
188    return tr_globalIsLocked( t->manager->session );
189}
190
191/**
192***
193**/
194
195static int
196handshakeCompareToAddr( const void * va,
197                        const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a,
206                  const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray      * handshakes,
213                      const tr_address * addr )
214{
215    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
216}
217
218static int
219comparePeerAtomToAddress( const void * va, const void * vb )
220{
221    const struct peer_atom * a = va;
222
223    return tr_compareAddresses( &a->addr, vb );
224}
225
226static int
227comparePeerAtoms( const void * va, const void * vb )
228{
229    const struct peer_atom * b = vb;
230
231    return comparePeerAtomToAddress( va, &b->addr );
232}
233
234/**
235***
236**/
237
238static int
239torrentCompare( const void * va,
240                const void * vb )
241{
242    const Torrent * a = va;
243    const Torrent * b = vb;
244
245    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
246}
247
248static int
249torrentCompareToHash( const void * va,
250                      const void * vb )
251{
252    const Torrent * a = va;
253    const uint8_t * b_hash = vb;
254
255    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
256}
257
258static Torrent*
259getExistingTorrent( tr_peerMgr *    manager,
260                    const uint8_t * hash )
261{
262    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
263                                             hash,
264                                             torrentCompareToHash );
265}
266
267static int
268peerCompare( const void * va, const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272
273    return tr_compareAddresses( &a->addr, &b->addr );
274}
275
276static int
277peerCompareToAddr( const void * va, const void * vb )
278{
279    const tr_peer * a = va;
280
281    return tr_compareAddresses( &a->addr, vb );
282}
283
284static tr_peer*
285getExistingPeer( Torrent          * torrent,
286                 const tr_address * addr )
287{
288    assert( torrentIsLocked( torrent ) );
289    assert( addr );
290
291    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
292}
293
294static struct peer_atom*
295getExistingAtom( const Torrent    * t,
296                 const tr_address * addr )
297{
298    assert( torrentIsLocked( t ) );
299    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
300}
301
302static tr_bool
303peerIsInUse( const Torrent    * ct,
304             const tr_address * addr )
305{
306    Torrent * t = (Torrent*) ct;
307
308    assert( torrentIsLocked ( t ) );
309
310    return getExistingPeer( t, addr )
311        || getExistingHandshake( t->outgoingHandshakes, addr )
312        || getExistingHandshake( t->manager->incomingHandshakes, addr );
313}
314
315static tr_peer*
316peerConstructor( tr_torrent * tor, const tr_address * addr )
317{
318    tr_peer * p;
319    p = tr_new0( tr_peer, 1 );
320    p->addr = *addr;
321    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
322    return p;
323}
324
325static tr_peer*
326getPeer( Torrent          * torrent,
327         const tr_address * addr )
328{
329    tr_peer * peer;
330
331    assert( torrentIsLocked( torrent ) );
332
333    peer = getExistingPeer( torrent, addr );
334
335    if( peer == NULL )
336    {
337        peer = peerConstructor( torrent->tor, addr );
338        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
339    }
340
341    return peer;
342}
343
344static void
345peerDestructor( tr_peer * peer )
346{
347    assert( peer );
348
349    if( peer->msgs != NULL )
350    {
351        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
352        tr_peerMsgsFree( peer->msgs );
353    }
354
355    tr_peerIoFree( peer->io );
356
357    tr_bitfieldFree( peer->have );
358    tr_bitfieldFree( peer->blame );
359    tr_free( peer->client );
360
361    tr_bandwidthFree( peer->bandwidth );
362
363    tr_free( peer );
364}
365
366static void
367removePeer( Torrent * t,
368            tr_peer * peer )
369{
370    tr_peer *          removed;
371    struct peer_atom * atom;
372
373    assert( torrentIsLocked( t ) );
374
375    atom = getExistingAtom( t, &peer->addr );
376    assert( atom );
377    atom->time = time( NULL );
378
379    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
380    assert( removed == peer );
381    peerDestructor( removed );
382}
383
384static void
385removeAllPeers( Torrent * t )
386{
387    while( !tr_ptrArrayEmpty( t->peers ) )
388        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
389}
390
391static void
392torrentDestructor( void * vt )
393{
394    Torrent * t = vt;
395    uint8_t   hash[SHA_DIGEST_LENGTH];
396
397    assert( t );
398    assert( !t->isRunning );
399    assert( t->peers );
400    assert( torrentIsLocked( t ) );
401    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
402    assert( tr_ptrArrayEmpty( t->peers ) );
403
404    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
405
406    tr_timerFree( &t->reconnectTimer );
407    tr_timerFree( &t->rechokeTimer );
408    tr_timerFree( &t->refillTimer );
409
410    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
411    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
412    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
413    tr_ptrArrayFree( t->peers, NULL );
414
415    tr_free( t->pendingRequestCount );
416    tr_free( t );
417}
418
419static void peerCallbackFunc( void * vpeer,
420                              void * vevent,
421                              void * vt );
422
423static Torrent*
424torrentConstructor( tr_peerMgr * manager,
425                    tr_torrent * tor )
426{
427    int       i;
428    Torrent * t;
429
430    t = tr_new0( Torrent, 1 );
431    t->manager = manager;
432    t->tor = tor;
433    t->pool = tr_ptrArrayNew( );
434    t->peers = tr_ptrArrayNew( );
435    t->webseeds = tr_ptrArrayNew( );
436    t->outgoingHandshakes = tr_ptrArrayNew( );
437    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
438
439    for( i = 0; i < tor->info.webseedCount; ++i )
440    {
441        tr_webseed * w =
442            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
443                           t );
444        tr_ptrArrayAppend( t->webseeds, w );
445    }
446
447    return t;
448}
449
450
451static int bandwidthPulse( void * vmgr );
452
453
454tr_peerMgr*
455tr_peerMgrNew( tr_session * session )
456{
457    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
458
459    m->session = session;
460    m->torrents = tr_ptrArrayNew( );
461    m->incomingHandshakes = tr_ptrArrayNew( );
462    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
463    return m;
464}
465
466void
467tr_peerMgrFree( tr_peerMgr * manager )
468{
469    managerLock( manager );
470
471    tr_timerFree( &manager->bandwidthTimer );
472
473    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
474     * the item from manager->handshakes, so this is a little roundabout... */
475    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
476        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
477
478    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
479
480    /* free the torrents. */
481    tr_ptrArrayFree( manager->torrents, torrentDestructor );
482
483    managerUnlock( manager );
484    tr_free( manager );
485}
486
487static tr_peer**
488getConnectedPeers( Torrent * t, int * setmeCount )
489{
490    int i, peerCount, connectionCount;
491    tr_peer **peers;
492    tr_peer **ret;
493
494    assert( torrentIsLocked( t ) );
495
496    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
497    ret = tr_new( tr_peer *, peerCount );
498
499    for( i = connectionCount = 0; i < peerCount; ++i )
500        if( peers[i]->msgs )
501            ret[connectionCount++] = peers[i];
502
503    *setmeCount = connectionCount;
504    return ret;
505}
506
507static int
508clientIsDownloadingFrom( const tr_peer * peer )
509{
510    return peer->clientIsInterested && !peer->clientIsChoked;
511}
512
513static int
514clientIsUploadingTo( const tr_peer * peer )
515{
516    return peer->peerIsInterested && !peer->peerIsChoked;
517}
518
519/***
520****
521***/
522
523tr_bool
524tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
525                      const uint8_t      * torrentHash,
526                      const tr_address   * addr )
527{
528    tr_bool isSeed = FALSE;
529    const Torrent * t = NULL;
530    const struct peer_atom * atom = NULL;
531
532    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
533    if( t )
534        atom = getExistingAtom( t, addr );
535    if( atom )
536        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
537
538    return isSeed;
539}
540
541/****
542*****
543*****  REFILL
544*****
545****/
546
547static void
548assertValidPiece( Torrent * t, tr_piece_index_t piece )
549{
550    assert( t );
551    assert( t->tor );
552    assert( piece < t->tor->info.pieceCount );
553}
554
555static int
556getPieceRequests( Torrent * t, tr_piece_index_t piece )
557{
558    assertValidPiece( t, piece );
559
560    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
561}
562
563static void
564incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
565{
566    assertValidPiece( t, piece );
567
568    if( t->pendingRequestCount == NULL )
569        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
570    t->pendingRequestCount[piece]++;
571}
572
573static void
574decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
575{
576    assertValidPiece( t, piece );
577
578    if( t->pendingRequestCount )
579        t->pendingRequestCount[piece]--;
580}
581
582struct tr_refill_piece
583{
584    tr_priority_t    priority;
585    uint32_t         piece;
586    uint32_t         peerCount;
587    int              random;
588    int              pendingRequestCount;
589    int              missingBlockCount;
590};
591
592static int
593compareRefillPiece( const void * aIn, const void * bIn )
594{
595    const struct tr_refill_piece * a = aIn;
596    const struct tr_refill_piece * b = bIn;
597
598    /* if one piece has a higher priority, it goes first */
599    if( a->priority != b->priority )
600        return a->priority > b->priority ? -1 : 1;
601
602    /* have a per-priority endgame */
603    if( a->pendingRequestCount != b->pendingRequestCount )
604        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
605
606    /* fewer missing pieces goes first */
607    if( a->missingBlockCount != b->missingBlockCount )
608        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
609
610    /* otherwise if one has fewer peers, it goes first */
611    if( a->peerCount != b->peerCount )
612        return a->peerCount < b->peerCount ? -1 : 1;
613
614    /* otherwise go with our random seed */
615    if( a->random != b->random )
616        return a->random < b->random ? -1 : 1;
617
618    return 0;
619}
620
621static tr_piece_index_t *
622getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
623{
624    const tr_torrent  * tor = t->tor;
625    const tr_info     * inf = &tor->info;
626    tr_piece_index_t    i;
627    tr_piece_index_t    poolSize = 0;
628    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
629    int                 peerCount;
630    tr_peer**           peers;
631
632    assert( torrentIsLocked( t ) );
633
634    peers = getConnectedPeers( t, &peerCount );
635
636    /* make a list of the pieces that we want but don't have */
637    for( i = 0; i < inf->pieceCount; ++i )
638        if( !tor->info.pieces[i].dnd
639                && !tr_cpPieceIsComplete( tor->completion, i ) )
640            pool[poolSize++] = i;
641
642    /* sort the pool by which to request next */
643    if( poolSize > 1 )
644    {
645        tr_piece_index_t j;
646        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
647
648        for( j = 0; j < poolSize; ++j )
649        {
650            int k;
651            const tr_piece_index_t piece = pool[j];
652            struct tr_refill_piece * setme = p + j;
653
654            setme->piece = piece;
655            setme->priority = inf->pieces[piece].priority;
656            setme->peerCount = 0;
657            setme->random = tr_cryptoWeakRandInt( INT_MAX );
658            setme->pendingRequestCount = getPieceRequests( t, piece );
659            setme->missingBlockCount
660                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
661
662            for( k = 0; k < peerCount; ++k )
663            {
664                const tr_peer * peer = peers[k];
665                if( peer->peerIsInterested
666                        && !peer->clientIsChoked
667                        && tr_bitfieldHas( peer->have, piece ) )
668                    ++setme->peerCount;
669            }
670        }
671
672        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
673               compareRefillPiece );
674
675        for( j = 0; j < poolSize; ++j )
676            pool[j] = p[j].piece;
677
678        tr_free( p );
679    }
680
681    tr_free( peers );
682
683    *pieceCount = poolSize;
684    return pool;
685}
686
687struct tr_blockIterator
688{
689    Torrent * t;
690    tr_block_index_t blockIndex, blockCount, *blocks;
691    tr_piece_index_t pieceIndex, pieceCount, *pieces;
692};
693
694static struct tr_blockIterator*
695blockIteratorNew( Torrent * t )
696{
697    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
698    i->t = t;
699    i->pieces = getPreferredPieces( t, &i->pieceCount );
700    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
701    return i;
702}
703
704static int
705blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
706{
707    int found;
708    Torrent * t = i->t;
709    tr_torrent * tor = t->tor;
710
711    while( ( i->blockIndex == i->blockCount )
712        && ( i->pieceIndex < i->pieceCount ) )
713    {
714        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
715        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
716        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
717        tr_block_index_t block;
718
719        assert( index < tor->info.pieceCount );
720
721        i->blockCount = 0;
722        i->blockIndex = 0;
723        for( block=b; block!=e; ++block )
724            if( !tr_cpBlockIsComplete( tor->completion, block ) )
725                i->blocks[i->blockCount++] = block;
726    }
727
728    if(( found = ( i->blockIndex < i->blockCount )))
729        *setme = i->blocks[i->blockIndex++];
730
731    return found;
732}
733
734static void
735blockIteratorFree( struct tr_blockIterator * i )
736{
737    tr_free( i->blocks );
738    tr_free( i->pieces );
739    tr_free( i );
740}
741
742static tr_peer**
743getPeersUploadingToClient( Torrent * t,
744                           int *     setmeCount )
745{
746    int j;
747    int peerCount = 0;
748    int retCount = 0;
749    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
750    tr_peer ** ret = tr_new( tr_peer *, peerCount );
751
752    j = 0; /* this is a temporary test to make sure we walk through all the peers */
753    if( peerCount )
754    {
755        /* Get a list of peers we're downloading from.
756           Pick a different starting point each time so all peers
757           get a chance at being the first in line */
758        const int fencepost = tr_cryptoWeakRandInt( peerCount );
759        int i = fencepost;
760        do {
761            if( clientIsDownloadingFrom( peers[i] ) )
762                ret[retCount++] = peers[i];
763            i = ( i + 1 ) % peerCount;
764            ++j;
765        } while( i != fencepost );
766    }
767    assert( j == peerCount );
768    *setmeCount = retCount;
769    return ret;
770}
771
772static uint32_t
773getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
774{
775    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
776    const uint64_t blockPos = tor->blockSize * b;
777    assert( blockPos >= piecePos );
778    return (uint32_t)( blockPos - piecePos );
779}
780
781static int
782refillPulse( void * vtorrent )
783{
784    tr_block_index_t block;
785    int peerCount;
786    int webseedCount;
787    tr_peer ** peers;
788    tr_webseed ** webseeds;
789    struct tr_blockIterator * blockIterator;
790    Torrent * t = vtorrent;
791    tr_torrent * tor = t->tor;
792
793    if( !t->isRunning )
794        return TRUE;
795    if( tr_torrentIsSeed( t->tor ) )
796        return TRUE;
797
798    torrentLock( t );
799    tordbg( t, "Refilling Request Buffers..." );
800
801    blockIterator = blockIteratorNew( t );
802    peers = getPeersUploadingToClient( t, &peerCount );
803    webseedCount = tr_ptrArraySize( t->webseeds );
804    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
805                          webseedCount * sizeof( tr_webseed* ) );
806
807    while( ( webseedCount || peerCount )
808        && blockIteratorNext( blockIterator, &block ) )
809    {
810        int j;
811        int handled = FALSE;
812
813        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
814        const uint32_t offset = getBlockOffsetInPiece( tor, block );
815        const uint32_t length = tr_torBlockCountBytes( tor, block );
816
817        /* find a peer who can ask for this block */
818        for( j=0; !handled && j<peerCount; )
819        {
820            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
821            switch( val )
822            {
823                case TR_ADDREQ_FULL:
824                case TR_ADDREQ_CLIENT_CHOKED:
825                    peers[j] = peers[--peerCount];
826                    break;
827
828                case TR_ADDREQ_MISSING:
829                case TR_ADDREQ_DUPLICATE:
830                    ++j;
831                    break;
832
833                case TR_ADDREQ_OK:
834                    incrementPieceRequests( t, index );
835                    handled = TRUE;
836                    break;
837
838                default:
839                    assert( 0 && "unhandled value" );
840                    break;
841            }
842        }
843
844        /* maybe one of the webseeds can do it */
845        for( j=0; !handled && j<webseedCount; )
846        {
847            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
848                                                          index, offset, length );
849            switch( val )
850            {
851                case TR_ADDREQ_FULL:
852                    webseeds[j] = webseeds[--webseedCount];
853                    break;
854
855                case TR_ADDREQ_OK:
856                    incrementPieceRequests( t, index );
857                    handled = TRUE;
858                    break;
859
860                default:
861                    assert( 0 && "unhandled value" );
862                    break;
863            }
864        }
865    }
866
867    /* cleanup */
868    blockIteratorFree( blockIterator );
869    tr_free( webseeds );
870    tr_free( peers );
871
872    t->refillTimer = NULL;
873    torrentUnlock( t );
874    return FALSE;
875}
876
877static void
878broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
879{
880    int i, size;
881    tr_peer ** peers;
882
883    assert( torrentIsLocked( t ) );
884
885    peers = getConnectedPeers( t, &size );
886    for( i=0; i<size; ++i )
887        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
888    tr_free( peers );
889}
890
891static void
892addStrike( Torrent * t,
893           tr_peer * peer )
894{
895    tordbg( t, "increasing peer %s strike count to %d",
896            tr_peerIoAddrStr( &peer->addr,
897                              peer->port ), peer->strikes + 1 );
898
899    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
900    {
901        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
902        atom->myflags |= MYFLAG_BANNED;
903        peer->doPurge = 1;
904        tordbg( t, "banning peer %s",
905               tr_peerIoAddrStr( &atom->addr, atom->port ) );
906    }
907}
908
909static void
910gotBadPiece( Torrent *        t,
911             tr_piece_index_t pieceIndex )
912{
913    tr_torrent *   tor = t->tor;
914    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
915
916    tor->corruptCur += byteCount;
917    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
918}
919
920static void
921refillSoon( Torrent * t )
922{
923    if( t->refillTimer == NULL )
924        t->refillTimer = tr_timerNew( t->manager->session,
925                                      refillPulse, t,
926                                      REFILL_PERIOD_MSEC );
927}
928
929static void
930peerSuggestedPiece( Torrent            * t,
931                    tr_peer            * peer,
932                    tr_piece_index_t     pieceIndex,
933                    int                  isFastAllowed )
934{
935    assert( t );
936    assert( peer );
937    assert( peer->msgs );
938
939    /* is this a valid piece? */
940    if(  pieceIndex >= t->tor->info.pieceCount )
941        return;
942
943    /* don't ask for it if we've already got it */
944    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
945        return;
946
947    /* don't ask for it if they don't have it */
948    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
949        return;
950
951    /* don't ask for it if we're choked and it's not fast */
952    if( !isFastAllowed && peer->clientIsChoked )
953        return;
954
955    /* request the blocks that we don't have in this piece */
956    {
957        tr_block_index_t block;
958        const tr_torrent * tor = t->tor;
959        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
960        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
961
962        for( block=start; block<end; ++block )
963        {
964            if( !tr_cpBlockIsComplete( tor->completion, block ) )
965            {
966                const uint32_t offset = getBlockOffsetInPiece( tor, block );
967                const uint32_t length = tr_torBlockCountBytes( tor, block );
968                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
969                incrementPieceRequests( t, pieceIndex );
970            }
971        }
972    }
973}
974
975static void
976peerCallbackFunc( void * vpeer,
977                  void * vevent,
978                  void * vt )
979{
980    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
981    Torrent             * t = vt;
982    const tr_peer_event * e = vevent;
983
984    torrentLock( t );
985
986    switch( e->eventType )
987    {
988        case TR_PEER_UPLOAD_ONLY:
989            /* update our atom */
990            if( peer ) {
991                struct peer_atom * a = getExistingAtom( t, &peer->addr );
992                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
993            }
994            break;
995
996        case TR_PEER_NEED_REQ:
997            refillSoon( t );
998            break;
999
1000        case TR_PEER_CANCEL:
1001            decrementPieceRequests( t, e->pieceIndex );
1002            break;
1003
1004        case TR_PEER_PEER_GOT_DATA:
1005        {
1006            const time_t now = time( NULL );
1007            tr_torrent * tor = t->tor;
1008
1009            tor->activityDate = now;
1010
1011            if( e->wasPieceData )
1012                tor->uploadedCur += e->length;
1013
1014            /* update the stats */
1015            if( e->wasPieceData )
1016                tr_statsAddUploaded( tor->session, e->length );
1017
1018            /* update our atom */
1019            if( peer ) {
1020                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1021                if( e->wasPieceData )
1022                    a->piece_data_time = now;
1023            }
1024
1025            break;
1026        }
1027
1028        case TR_PEER_CLIENT_GOT_SUGGEST:
1029            if( peer )
1030                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1031            break;
1032
1033        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1034            if( peer )
1035                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1036            break;
1037
1038        case TR_PEER_CLIENT_GOT_DATA:
1039        {
1040            const time_t now = time( NULL );
1041            tr_torrent * tor = t->tor;
1042            tor->activityDate = now;
1043
1044            /* only add this to downloadedCur if we got it from a peer --
1045             * webseeds shouldn't count against our ratio.  As one tracker
1046             * admin put it, "Those pieces are downloaded directly from the
1047             * content distributor, not the peers, it is the tracker's job
1048             * to manage the swarms, not the web server and does not fit
1049             * into the jurisdiction of the tracker." */
1050            if( peer && e->wasPieceData )
1051                tor->downloadedCur += e->length;
1052
1053            /* update the stats */ 
1054            if( e->wasPieceData )
1055                tr_statsAddDownloaded( tor->session, e->length );
1056
1057            /* update our atom */
1058            if( peer ) {
1059                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1060                if( e->wasPieceData )
1061                    a->piece_data_time = now;
1062            }
1063
1064            break;
1065        }
1066
1067        case TR_PEER_PEER_PROGRESS:
1068        {
1069            if( peer )
1070            {
1071                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1072                const int peerIsSeed = e->progress >= 1.0;
1073                if( peerIsSeed ) {
1074                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1075                    atom->flags |= ADDED_F_SEED_FLAG;
1076                } else {
1077                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1078                    atom->flags &= ~ADDED_F_SEED_FLAG;
1079                }
1080            }
1081            break;
1082        }
1083
1084        case TR_PEER_CLIENT_GOT_BLOCK:
1085        {
1086            tr_torrent *     tor = t->tor;
1087
1088            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1089
1090            tr_cpBlockAdd( tor->completion, block );
1091            decrementPieceRequests( t, e->pieceIndex );
1092
1093            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1094
1095            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1096            {
1097                const tr_piece_index_t p = e->pieceIndex;
1098                const tr_bool ok = tr_ioTestPiece( tor, p );
1099
1100                if( !ok )
1101                {
1102                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1103                               (unsigned long)p );
1104                }
1105
1106                tr_torrentSetHasPiece( tor, p, ok );
1107                tr_torrentSetPieceChecked( tor, p, TRUE );
1108                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1109
1110                if( !ok )
1111                    gotBadPiece( t, p );
1112                else
1113                {
1114                    int i, peerCount;
1115                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1116                    for( i = 0; i < peerCount; ++i )
1117                        tr_peerMsgsHave( peers[i]->msgs, p );
1118                    tr_free( peers );
1119                }
1120
1121                tr_torrentRecheckCompleteness( tor );
1122            }
1123            break;
1124        }
1125
1126        case TR_PEER_ERROR:
1127            if( e->err == EINVAL )
1128            {
1129                addStrike( t, peer );
1130                peer->doPurge = 1;
1131            }
1132            else if( ( e->err == ERANGE )
1133                  || ( e->err == EMSGSIZE )
1134                  || ( e->err == ENOTCONN ) )
1135            {
1136                /* some protocol error from the peer */
1137                peer->doPurge = 1;
1138            }
1139            else /* a local error, such as an IO error */
1140            {
1141                t->tor->error = e->err;
1142                tr_strlcpy( t->tor->errorString,
1143                            tr_strerror( t->tor->error ),
1144                            sizeof( t->tor->errorString ) );
1145                tr_torrentStop( t->tor );
1146            }
1147            break;
1148
1149        default:
1150            assert( 0 );
1151    }
1152
1153    torrentUnlock( t );
1154}
1155
1156static void
1157ensureAtomExists( Torrent          * t,
1158                  const tr_address * addr,
1159                  tr_port            port,
1160                  uint8_t            flags,
1161                  uint8_t            from )
1162{
1163    if( getExistingAtom( t, addr ) == NULL )
1164    {
1165        struct peer_atom * a;
1166        a = tr_new0( struct peer_atom, 1 );
1167        a->addr = *addr;
1168        a->port = port;
1169        a->flags = flags;
1170        a->from = from;
1171        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1172        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1173    }
1174}
1175
1176static int
1177getMaxPeerCount( const tr_torrent * tor )
1178{
1179    return tor->maxConnectedPeers;
1180}
1181
1182static int
1183getPeerCount( const Torrent * t )
1184{
1185    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1186}
1187
1188/* FIXME: this is kind of a mess. */
1189static tr_bool
1190myHandshakeDoneCB( tr_handshake *  handshake,
1191                   tr_peerIo *     io,
1192                   int             isConnected,
1193                   const uint8_t * peer_id,
1194                   void *          vmanager )
1195{
1196    tr_bool            ok = isConnected;
1197    tr_bool            success = FALSE;
1198    tr_port            port;
1199    const tr_address * addr;
1200    tr_peerMgr       * manager = vmanager;
1201    Torrent          * t;
1202    tr_handshake     * ours;
1203
1204    assert( io );
1205    assert( isConnected == 0 || isConnected == 1 );
1206
1207    t = tr_peerIoHasTorrentHash( io )
1208        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1209        : NULL;
1210
1211    if( tr_peerIoIsIncoming ( io ) )
1212        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1213                                        handshake, handshakeCompare );
1214    else if( t )
1215        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1216                                        handshake, handshakeCompare );
1217    else
1218        ours = handshake;
1219
1220    assert( ours );
1221    assert( ours == handshake );
1222
1223    if( t )
1224        torrentLock( t );
1225
1226    addr = tr_peerIoGetAddress( io, &port );
1227
1228    if( !ok || !t || !t->isRunning )
1229    {
1230        if( t )
1231        {
1232            struct peer_atom * atom = getExistingAtom( t, addr );
1233            if( atom )
1234                ++atom->numFails;
1235        }
1236
1237        tr_peerIoFree( io );
1238    }
1239    else /* looking good */
1240    {
1241        struct peer_atom * atom;
1242        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1243        atom = getExistingAtom( t, addr );
1244        atom->time = time( NULL );
1245        atom->piece_data_time = 0;
1246
1247        if( atom->myflags & MYFLAG_BANNED )
1248        {
1249            tordbg( t, "banned peer %s tried to reconnect",
1250                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1251            tr_peerIoFree( io );
1252        }
1253        else if( tr_peerIoIsIncoming( io )
1254               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1255
1256        {
1257            tr_peerIoFree( io );
1258        }
1259        else
1260        {
1261            tr_peer * peer = getExistingPeer( t, addr );
1262
1263            if( peer ) /* we already have this peer */
1264            {
1265                tr_peerIoFree( io );
1266            }
1267            else
1268            {
1269                peer = getPeer( t, addr );
1270                tr_free( peer->client );
1271
1272                if( !peer_id )
1273                    peer->client = NULL;
1274                else {
1275                    char client[128];
1276                    tr_clientForId( client, sizeof( client ), peer_id );
1277                    peer->client = tr_strdup( client );
1278                }
1279
1280                peer->port = port;
1281                peer->io = io;
1282                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1283                tr_peerIoSetBandwidth( io, peer->bandwidth );
1284
1285                success = TRUE;
1286            }
1287        }
1288    }
1289
1290    if( t )
1291        torrentUnlock( t );
1292
1293    return success;
1294}
1295
1296void
1297tr_peerMgrAddIncoming( tr_peerMgr * manager,
1298                       tr_address * addr,
1299                       tr_port      port,
1300                       int          socket )
1301{
1302    managerLock( manager );
1303
1304    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1305    {
1306        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1307        tr_netClose( socket );
1308    }
1309    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1310    {
1311        tr_netClose( socket );
1312    }
1313    else /* we don't have a connetion to them yet... */
1314    {
1315        tr_peerIo *    io;
1316        tr_handshake * handshake;
1317
1318        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1319
1320        handshake = tr_handshakeNew( io,
1321                                     manager->session->encryptionMode,
1322                                     myHandshakeDoneCB,
1323                                     manager );
1324
1325        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1326                                 handshakeCompare );
1327    }
1328
1329    managerUnlock( manager );
1330}
1331
1332void
1333tr_peerMgrAddPex( tr_peerMgr *    manager,
1334                  const uint8_t * torrentHash,
1335                  uint8_t         from,
1336                  const tr_pex *  pex )
1337{
1338    Torrent * t;
1339
1340    managerLock( manager );
1341
1342    t = getExistingTorrent( manager, torrentHash );
1343    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1344        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1345
1346    managerUnlock( manager );
1347}
1348
1349tr_pex *
1350tr_peerMgrCompactToPex( const void *    compact,
1351                        size_t          compactLen,
1352                        const uint8_t * added_f,
1353                        size_t          added_f_len,
1354                        size_t *        pexCount )
1355{
1356    size_t          i;
1357    size_t          n = compactLen / 6;
1358    const uint8_t * walk = compact;
1359    tr_pex *        pex = tr_new0( tr_pex, n );
1360
1361    for( i = 0; i < n; ++i )
1362    {
1363        pex[i].addr.type = TR_AF_INET;
1364        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1365        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1366        if( added_f && ( n == added_f_len ) )
1367            pex[i].flags = added_f[i];
1368    }
1369
1370    *pexCount = n;
1371    return pex;
1372}
1373
1374/**
1375***
1376**/
1377
1378void
1379tr_peerMgrSetBlame( tr_peerMgr *     manager,
1380                    const uint8_t *  torrentHash,
1381                    tr_piece_index_t pieceIndex,
1382                    int              success )
1383{
1384    if( !success )
1385    {
1386        int        peerCount, i;
1387        Torrent *  t = getExistingTorrent( manager, torrentHash );
1388        tr_peer ** peers;
1389
1390        assert( torrentIsLocked( t ) );
1391
1392        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1393        for( i = 0; i < peerCount; ++i )
1394        {
1395            tr_peer * peer = peers[i];
1396            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1397            {
1398                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1399                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1400                        pieceIndex, (int)peer->strikes + 1 );
1401                addStrike( t, peer );
1402            }
1403        }
1404    }
1405}
1406
1407int
1408tr_pexCompare( const void * va, const void * vb )
1409{
1410    const tr_pex * a = va;
1411    const tr_pex * b = vb;
1412    int i = tr_compareAddresses( &a->addr, &b->addr );
1413
1414    if( i ) return i;
1415    if( a->port < b->port ) return -1;
1416    if( a->port > b->port ) return 1;
1417    return 0;
1418}
1419
1420static int
1421peerPrefersCrypto( const tr_peer * peer )
1422{
1423    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1424        return TRUE;
1425
1426    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1427        return FALSE;
1428
1429    return tr_peerIoIsEncrypted( peer->io );
1430}
1431
1432int
1433tr_peerMgrGetPeers( tr_peerMgr *    manager,
1434                    const uint8_t * torrentHash,
1435                    tr_pex **       setme_pex )
1436{
1437    int peerCount = 0;
1438    const Torrent *  t;
1439
1440    managerLock( manager );
1441
1442    t = getExistingTorrent( manager, torrentHash );
1443    if( t == NULL )
1444    {
1445        *setme_pex = NULL;
1446    }
1447    else
1448    {
1449        int i;
1450        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1451        tr_pex * pex = tr_new( tr_pex, peerCount );
1452        tr_pex * walk = pex;
1453
1454        for( i=0; i<peerCount; ++i, ++walk )
1455        {
1456            const tr_peer * peer = peers[i];
1457            const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1458
1459            walk->addr = peer->addr;
1460            walk->port = peer->port;
1461            walk->flags = 0;
1462            if( peerPrefersCrypto( peer ) )
1463                walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1464            if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1465                walk->flags |= ADDED_F_SEED_FLAG;
1466        }
1467
1468        assert( ( walk - pex ) == peerCount );
1469        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1470        *setme_pex = pex;
1471    }
1472
1473    managerUnlock( manager );
1474    return peerCount;
1475}
1476
1477static int reconnectPulse( void * vtorrent );
1478
1479static int rechokePulse( void * vtorrent );
1480
1481void
1482tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1483                        const uint8_t * torrentHash )
1484{
1485    Torrent * t;
1486
1487    managerLock( manager );
1488
1489    t = getExistingTorrent( manager, torrentHash );
1490
1491    assert( t );
1492    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1493    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1494
1495    if( !t->isRunning )
1496    {
1497        t->isRunning = 1;
1498
1499        t->reconnectTimer = tr_timerNew( t->manager->session,
1500                                         reconnectPulse, t,
1501                                         RECONNECT_PERIOD_MSEC );
1502
1503        t->rechokeTimer = tr_timerNew( t->manager->session,
1504                                       rechokePulse, t,
1505                                       RECHOKE_PERIOD_MSEC );
1506
1507        reconnectPulse( t );
1508
1509        rechokePulse( t );
1510
1511        if( !tr_ptrArrayEmpty( t->webseeds ) )
1512            refillSoon( t );
1513    }
1514
1515    managerUnlock( manager );
1516}
1517
1518static void
1519stopTorrent( Torrent * t )
1520{
1521    assert( torrentIsLocked( t ) );
1522
1523    t->isRunning = 0;
1524    tr_timerFree( &t->rechokeTimer );
1525    tr_timerFree( &t->reconnectTimer );
1526
1527    /* disconnect the peers. */
1528    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1529    tr_ptrArrayClear( t->peers );
1530
1531    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1532     * which removes the handshake from t->outgoingHandshakes... */
1533    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1534        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1535}
1536
1537void
1538tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1539                       const uint8_t * torrentHash )
1540{
1541    managerLock( manager );
1542
1543    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1544
1545    managerUnlock( manager );
1546}
1547
1548void
1549tr_peerMgrAddTorrent( tr_peerMgr * manager,
1550                      tr_torrent * tor )
1551{
1552    Torrent * t;
1553
1554    managerLock( manager );
1555
1556    assert( tor );
1557    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1558
1559    t = torrentConstructor( manager, tor );
1560    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1561
1562    managerUnlock( manager );
1563}
1564
1565void
1566tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1567                         const uint8_t * torrentHash )
1568{
1569    Torrent * t;
1570
1571    managerLock( manager );
1572
1573    t = getExistingTorrent( manager, torrentHash );
1574    assert( t );
1575    stopTorrent( t );
1576    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1577    torrentDestructor( t );
1578
1579    managerUnlock( manager );
1580}
1581
1582void
1583tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1584                               const uint8_t *    torrentHash,
1585                               int8_t *           tab,
1586                               unsigned int       tabCount )
1587{
1588    tr_piece_index_t   i;
1589    const Torrent *    t;
1590    const tr_torrent * tor;
1591    float              interval;
1592    int                isSeed;
1593    int                peerCount;
1594    const tr_peer **   peers;
1595
1596    managerLock( manager );
1597
1598    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1599    tor = t->tor;
1600    interval = tor->info.pieceCount / (float)tabCount;
1601    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1602    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1603
1604    memset( tab, 0, tabCount );
1605
1606    for( i = 0; tor && i < tabCount; ++i )
1607    {
1608        const int piece = i * interval;
1609
1610        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1611            tab[i] = -1;
1612        else if( peerCount )
1613        {
1614            int j;
1615            for( j = 0; j < peerCount; ++j )
1616                if( tr_bitfieldHas( peers[j]->have, i ) )
1617                    ++tab[i];
1618        }
1619    }
1620
1621    managerUnlock( manager );
1622}
1623
1624/* Returns the pieces that are available from peers */
1625tr_bitfield*
1626tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1627                        const uint8_t *    torrentHash )
1628{
1629    int           i, size;
1630    Torrent *     t;
1631    tr_peer **    peers;
1632    tr_bitfield * pieces;
1633    managerLock( manager );
1634
1635    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1636    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1637    peers = getConnectedPeers( t, &size );
1638    for( i = 0; i < size; ++i )
1639        tr_bitfieldOr( pieces, peers[i]->have );
1640
1641    managerUnlock( manager );
1642    tr_free( peers );
1643    return pieces;
1644}
1645
1646int
1647tr_peerMgrHasConnections( const tr_peerMgr * manager,
1648                          const uint8_t *    torrentHash )
1649{
1650    int ret;
1651    const Torrent * t;
1652    managerLock( manager );
1653
1654    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1655    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1656
1657    managerUnlock( manager );
1658    return ret;
1659}
1660
1661void
1662tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1663                        const uint8_t    * torrentHash,
1664                        int              * setmePeersKnown,
1665                        int              * setmePeersConnected,
1666                        int              * setmeSeedsConnected,
1667                        int              * setmeWebseedsSendingToUs,
1668                        int              * setmePeersSendingToUs,
1669                        int              * setmePeersGettingFromUs,
1670                        int              * setmePeersFrom )
1671{
1672    int i, size;
1673    const Torrent * t;
1674    const tr_peer ** peers;
1675    const tr_webseed ** webseeds;
1676
1677    managerLock( manager );
1678
1679    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1680    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1681
1682    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1683    *setmePeersConnected       = 0;
1684    *setmeSeedsConnected       = 0;
1685    *setmePeersGettingFromUs   = 0;
1686    *setmePeersSendingToUs     = 0;
1687    *setmeWebseedsSendingToUs  = 0;
1688
1689    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1690        setmePeersFrom[i] = 0;
1691
1692    for( i=0; i<size; ++i )
1693    {
1694        const tr_peer * peer = peers[i];
1695        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1696
1697        if( peer->io == NULL ) /* not connected */
1698            continue;
1699
1700        ++*setmePeersConnected;
1701
1702        ++setmePeersFrom[atom->from];
1703
1704        if( clientIsDownloadingFrom( peer ) )
1705            ++*setmePeersSendingToUs;
1706
1707        if( clientIsUploadingTo( peer ) )
1708            ++*setmePeersGettingFromUs;
1709
1710        if( atom->flags & ADDED_F_SEED_FLAG )
1711            ++*setmeSeedsConnected;
1712    }
1713
1714    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1715    for( i=0; i<size; ++i )
1716    {
1717        if( tr_webseedIsActive( webseeds[i] ) )
1718            ++*setmeWebseedsSendingToUs;
1719    }
1720
1721    managerUnlock( manager );
1722}
1723
1724float*
1725tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1726                     const uint8_t *    torrentHash )
1727{
1728    const Torrent * t;
1729    const tr_webseed ** webseeds;
1730    int i;
1731    int webseedCount;
1732    float * ret;
1733
1734    assert( manager );
1735    managerLock( manager );
1736
1737    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1738    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1739                                                     &webseedCount );
1740    assert( webseedCount == t->tor->info.webseedCount );
1741    ret = tr_new0( float, webseedCount );
1742
1743    for( i=0; i<webseedCount; ++i )
1744        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1745            ret[i] = -1.0;
1746
1747    managerUnlock( manager );
1748    return ret;
1749}
1750
1751double
1752tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1753{
1754    assert( peer );
1755    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1756
1757    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1758}
1759
1760
1761struct tr_peer_stat *
1762tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1763                     const   uint8_t     * torrentHash,
1764                     int                 * setmeCount UNUSED )
1765{
1766    int             i, size;
1767    const Torrent * t;
1768    tr_peer **      peers;
1769    tr_peer_stat *  ret;
1770
1771    assert( manager );
1772    managerLock( manager );
1773
1774    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1775    peers = getConnectedPeers( (Torrent*)t, &size );
1776    ret = tr_new0( tr_peer_stat, size );
1777
1778    for( i = 0; i < size; ++i )
1779    {
1780        char *                   pch;
1781        const tr_peer *          peer = peers[i];
1782        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1783        tr_peer_stat *           stat = ret + i;
1784
1785        tr_ntop( &peer->addr, stat->addr, sizeof(stat->addr) );
1786        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1787        stat->port               = ntohs( peer->port );
1788        stat->from               = atom->from;
1789        stat->progress           = peer->progress;
1790        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1791        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1792        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1793        stat->peerIsChoked       = peer->peerIsChoked;
1794        stat->peerIsInterested   = peer->peerIsInterested;
1795        stat->clientIsChoked     = peer->clientIsChoked;
1796        stat->clientIsInterested = peer->clientIsInterested;
1797        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1798        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1799        stat->isUploadingTo      = clientIsUploadingTo( peer );
1800        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1801
1802        pch = stat->flagStr;
1803        if( t->optimistic == peer ) *pch++ = 'O';
1804        if( stat->isDownloadingFrom ) *pch++ = 'D';
1805        else if( stat->clientIsInterested ) *pch++ = 'd';
1806        if( stat->isUploadingTo ) *pch++ = 'U';
1807        else if( stat->peerIsInterested ) *pch++ = 'u';
1808        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1809        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1810        if( stat->isEncrypted ) *pch++ = 'E';
1811        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1812        if( stat->isIncoming ) *pch++ = 'I';
1813        *pch = '\0';
1814    }
1815
1816    *setmeCount = size;
1817    tr_free( peers );
1818
1819    managerUnlock( manager );
1820    return ret;
1821}
1822
1823/**
1824***
1825**/
1826
1827struct ChokeData
1828{
1829    tr_bool         doUnchoke;
1830    tr_bool         isInterested;
1831    tr_bool         isChoked;
1832    int             rate;
1833    tr_peer *       peer;
1834};
1835
1836static int
1837compareChoke( const void * va,
1838              const void * vb )
1839{
1840    const struct ChokeData * a = va;
1841    const struct ChokeData * b = vb;
1842
1843    if( a->rate != b->rate ) /* prefer higher overall speeds */
1844        return a->rate > b->rate ? -1 : 1;
1845
1846    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1847        return a->isChoked ? 1 : -1;
1848
1849    return 0;
1850}
1851
1852static int
1853isNew( const tr_peer * peer )
1854{
1855    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1856}
1857
1858static int
1859isSame( const tr_peer * peer )
1860{
1861    return peer && peer->client && strstr( peer->client, "Transmission" );
1862}
1863
1864/**
1865***
1866**/
1867
1868static void
1869rechoke( Torrent * t )
1870{
1871    int                i, peerCount, size, unchokedInterested;
1872    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1873    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1874    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1875
1876    assert( torrentIsLocked( t ) );
1877
1878    /* sort the peers by preference and rate */
1879    for( i = 0, size = 0; i < peerCount; ++i )
1880    {
1881        tr_peer * peer = peers[i];
1882        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1883
1884        if( peer->progress >= 1.0 ) /* choke all seeds */
1885        {
1886            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1887        }
1888        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1889        {
1890            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1891        }
1892        else if( chokeAll ) /* choke everyone if we're not uploading */
1893        {
1894            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1895        }
1896        else
1897        {
1898            struct ChokeData * n = &choke[size++];
1899            n->peer         = peer;
1900            n->isInterested = peer->peerIsInterested;
1901            n->isChoked     = peer->peerIsChoked;
1902            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1903        }
1904    }
1905
1906    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1907
1908    /**
1909     * Reciprocation and number of uploads capping is managed by unchoking
1910     * the N peers which have the best upload rate and are interested.
1911     * This maximizes the client's download rate. These N peers are
1912     * referred to as downloaders, because they are interested in downloading
1913     * from the client.
1914     *
1915     * Peers which have a better upload rate (as compared to the downloaders)
1916     * but aren't interested get unchoked. If they become interested, the
1917     * downloader with the worst upload rate gets choked. If a client has
1918     * a complete file, it uses its upload rate rather than its download
1919     * rate to decide which peers to unchoke.
1920     */
1921    unchokedInterested = 0;
1922    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1923        choke[i].doUnchoke = 1;
1924        if( choke[i].isInterested )
1925            ++unchokedInterested;
1926    }
1927
1928    /* optimistic unchoke */
1929    if( i < size )
1930    {
1931        int n;
1932        struct ChokeData * c;
1933        tr_ptrArray * randPool = tr_ptrArrayNew( );
1934
1935        for( ; i<size; ++i )
1936        {
1937            if( choke[i].isInterested )
1938            {
1939                const tr_peer * peer = choke[i].peer;
1940                int x = 1, y;
1941                if( isNew( peer ) ) x *= 3;
1942                if( isSame( peer ) ) x *= 3;
1943                for( y=0; y<x; ++y )
1944                    tr_ptrArrayAppend( randPool, &choke[i] );
1945            }
1946        }
1947
1948        if(( n = tr_ptrArraySize( randPool )))
1949        {
1950            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
1951            c->doUnchoke = 1;
1952            t->optimistic = c->peer;
1953        }
1954
1955        tr_ptrArrayFree( randPool, NULL );
1956    }
1957
1958    for( i = 0; i < size; ++i )
1959        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1960
1961    /* cleanup */
1962    tr_free( choke );
1963    tr_free( peers );
1964}
1965
1966static int
1967rechokePulse( void * vtorrent )
1968{
1969    Torrent * t = vtorrent;
1970
1971    torrentLock( t );
1972    rechoke( t );
1973    torrentUnlock( t );
1974    return TRUE;
1975}
1976
1977/***
1978****
1979****  Life and Death
1980****
1981***/
1982
1983static int
1984shouldPeerBeClosed( const Torrent * t,
1985                    const tr_peer * peer,
1986                    int             peerCount )
1987{
1988    const tr_torrent *       tor = t->tor;
1989    const time_t             now = time( NULL );
1990    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1991
1992    /* if it's marked for purging, close it */
1993    if( peer->doPurge )
1994    {
1995        tordbg( t, "purging peer %s because its doPurge flag is set",
1996                tr_peerIoAddrStr( &atom->addr, atom->port ) );
1997        return TRUE;
1998    }
1999
2000    /* if we're seeding and the peer has everything we have,
2001     * and enough time has passed for a pex exchange, then disconnect */
2002    if( tr_torrentIsSeed( tor ) )
2003    {
2004        int peerHasEverything;
2005        if( atom->flags & ADDED_F_SEED_FLAG )
2006            peerHasEverything = TRUE;
2007        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2008            peerHasEverything = FALSE;
2009        else {
2010            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2011            tr_bitfieldDifference( tmp, peer->have );
2012            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2013            tr_bitfieldFree( tmp );
2014        }
2015        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2016            tordbg( t, "purging peer %s because we're both seeds",
2017                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2018            return TRUE;
2019        }
2020    }
2021
2022    /* disconnect if it's been too long since piece data has been transferred.
2023     * this is on a sliding scale based on number of available peers... */
2024    {
2025        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2026        /* if we have >= relaxIfFewerThan, strictness is 100%.
2027         * if we have zero connections, strictness is 0% */
2028        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2029            ? 1.0
2030            : peerCount / (float)relaxStrictnessIfFewerThanN;
2031        const int lo = MIN_UPLOAD_IDLE_SECS;
2032        const int hi = MAX_UPLOAD_IDLE_SECS;
2033        const int limit = hi - ( ( hi - lo ) * strictness );
2034        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2035/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2036        if( idleTime > limit ) {
2037            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2038                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2039            return TRUE;
2040        }
2041    }
2042
2043    return FALSE;
2044}
2045
2046static tr_peer **
2047getPeersToClose( Torrent * t, int * setmeSize )
2048{
2049    int i, peerCount, outsize;
2050    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2051    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2052
2053    assert( torrentIsLocked( t ) );
2054
2055    for( i = outsize = 0; i < peerCount; ++i )
2056        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2057            ret[outsize++] = peers[i];
2058
2059    *setmeSize = outsize;
2060    return ret;
2061}
2062
2063static int
2064compareCandidates( const void * va,
2065                   const void * vb )
2066{
2067    const struct peer_atom * a = *(const struct peer_atom**) va;
2068    const struct peer_atom * b = *(const struct peer_atom**) vb;
2069
2070    /* <Charles> Here we would probably want to try reconnecting to
2071     * peers that had most recently given us data. Lots of users have
2072     * trouble with resets due to their routers and/or ISPs. This way we
2073     * can quickly recover from an unwanted reset. So we sort
2074     * piece_data_time in descending order.
2075     */
2076
2077    if( a->piece_data_time != b->piece_data_time )
2078        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2079
2080    if( a->numFails != b->numFails )
2081        return a->numFails < b->numFails ? -1 : 1;
2082
2083    if( a->time != b->time )
2084        return a->time < b->time ? -1 : 1;
2085
2086    /* all other things being equal, prefer peers whose
2087     * information comes from a more reliable source */
2088    if( a->from != b->from )
2089        return a->from < b->from ? -1 : 1;
2090
2091    return 0;
2092}
2093
2094static int
2095getReconnectIntervalSecs( const struct peer_atom * atom )
2096{
2097    int          sec;
2098    const time_t now = time( NULL );
2099
2100    /* if we were recently connected to this peer and transferring piece
2101     * data, try to reconnect to them sooner rather that later -- we don't
2102     * want network troubles to get in the way of a good peer. */
2103    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2104        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2105
2106    /* don't allow reconnects more often than our minimum */
2107    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2108        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2109
2110    /* otherwise, the interval depends on how many times we've tried
2111     * and failed to connect to the peer */
2112    else switch( atom->numFails ) {
2113        case 0: sec = 0; break;
2114        case 1: sec = 5; break;
2115        case 2: sec = 2 * 60; break;
2116        case 3: sec = 15 * 60; break;
2117        case 4: sec = 30 * 60; break;
2118        case 5: sec = 60 * 60; break;
2119        default: sec = 120 * 60; break;
2120    }
2121
2122    return sec;
2123}
2124
2125static struct peer_atom **
2126getPeerCandidates( Torrent * t, int * setmeSize )
2127{
2128    int                 i, atomCount, retCount;
2129    struct peer_atom ** atoms;
2130    struct peer_atom ** ret;
2131    const time_t        now = time( NULL );
2132    const int           seed = tr_torrentIsSeed( t->tor );
2133
2134    assert( torrentIsLocked( t ) );
2135
2136    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2137    ret = tr_new( struct peer_atom*, atomCount );
2138    for( i = retCount = 0; i < atomCount; ++i )
2139    {
2140        int                interval;
2141        struct peer_atom * atom = atoms[i];
2142
2143        /* peer fed us too much bad data ... we only keep it around
2144         * now to weed it out in case someone sends it to us via pex */
2145        if( atom->myflags & MYFLAG_BANNED )
2146            continue;
2147
2148        /* peer was unconnectable before, so we're not going to keep trying.
2149         * this is needs a separate flag from `banned', since if they try
2150         * to connect to us later, we'll let them in */
2151        if( atom->myflags & MYFLAG_UNREACHABLE )
2152            continue;
2153
2154        /* we don't need two connections to the same peer... */
2155        if( peerIsInUse( t, &atom->addr ) )
2156            continue;
2157
2158        /* no need to connect if we're both seeds... */
2159        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2160                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2161            continue;
2162
2163        /* don't reconnect too often */
2164        interval = getReconnectIntervalSecs( atom );
2165        if( ( now - atom->time ) < interval )
2166        {
2167            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2168                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2169            continue;
2170        }
2171
2172        /* Don't connect to peers in our blocklist */
2173        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2174            continue;
2175
2176        ret[retCount++] = atom;
2177    }
2178
2179    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2180    *setmeSize = retCount;
2181    return ret;
2182}
2183
2184static int
2185reconnectPulse( void * vtorrent )
2186{
2187    Torrent *     t = vtorrent;
2188    static time_t prevTime = 0;
2189    static int    newConnectionsThisSecond = 0;
2190    time_t        now;
2191
2192    torrentLock( t );
2193
2194    now = time( NULL );
2195    if( prevTime != now )
2196    {
2197        prevTime = now;
2198        newConnectionsThisSecond = 0;
2199    }
2200
2201    if( !t->isRunning )
2202    {
2203        removeAllPeers( t );
2204    }
2205    else
2206    {
2207        int i, nCandidates, nBad;
2208        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2209        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2210
2211        if( nBad || nCandidates )
2212            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2213                   "%d connection candidates, %d atoms, max per pulse is %d",
2214                   t->tor->info.name, nBad, nCandidates,
2215                   tr_ptrArraySize( t->pool ),
2216                   (int)MAX_RECONNECTIONS_PER_PULSE );
2217
2218        /* disconnect some peers.
2219           if we transferred piece data, then they might be good peers,
2220           so reset their `numFails' weight to zero.  otherwise we connected
2221           to them fruitlessly, so mark it as another fail */
2222        for( i = 0; i < nBad; ++i ) {
2223            tr_peer * peer = connections[i];
2224            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2225            if( atom->piece_data_time )
2226                atom->numFails = 0;
2227            else
2228                ++atom->numFails;
2229            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2230            removePeer( t, peer );
2231        }
2232
2233        /* add some new ones */
2234        for( i = 0;    ( i < nCandidates )
2235           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2236           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2237           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2238        {
2239            tr_peerMgr *       mgr = t->manager;
2240            struct peer_atom * atom = candidates[i];
2241            tr_peerIo *        io;
2242
2243            tordbg( t, "Starting an OUTGOING connection with %s",
2244                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2245
2246            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2247            if( io == NULL )
2248            {
2249                atom->myflags |= MYFLAG_UNREACHABLE;
2250            }
2251            else
2252            {
2253                tr_handshake * handshake = tr_handshakeNew( io,
2254                                                            mgr->session->encryptionMode,
2255                                                            myHandshakeDoneCB,
2256                                                            mgr );
2257
2258                assert( tr_peerIoGetTorrentHash( io ) );
2259
2260                ++newConnectionsThisSecond;
2261
2262                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2263                                         handshakeCompare );
2264            }
2265
2266            atom->time = time( NULL );
2267        }
2268
2269        /* cleanup */
2270        tr_free( connections );
2271        tr_free( candidates );
2272    }
2273
2274    torrentUnlock( t );
2275    return TRUE;
2276}
2277
2278/****
2279*****
2280*****  BANDWIDTH ALLOCATION
2281*****
2282****/
2283
2284static void
2285pumpAllPeers( tr_peerMgr * mgr )
2286{
2287    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2288    int       i, j;
2289
2290    for( i=0; i<torrentCount; ++i )
2291    {
2292        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2293        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2294        {
2295            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2296            tr_peerMsgsPulse( peer->msgs );
2297        }
2298    }
2299}
2300
2301static int
2302bandwidthPulse( void * vmgr )
2303{
2304    tr_peerMgr * mgr = vmgr;
2305    managerLock( mgr );
2306
2307    pumpAllPeers( mgr );
2308    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2309    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2310    pumpAllPeers( mgr );
2311
2312    managerUnlock( mgr );
2313    return TRUE;
2314}
Note: See TracBrowser for help on using the repository browser.