source: trunk/libtransmission/peer-mgr.c @ 7824

Last change on this file since 7824 was 7824, checked in by charles, 13 years ago

(trunk libT) #1748: possible fix for the kqueue corruption errors by consolidating the three per-torrent libevent timers into three session-wide timers. Since most people reporting this error have lots of torrents loaded, consider a hypothetical example: if you had 500 torrents, this patch will reduce 1,500 libevent timers down to just three timers. On top of that, those three have simpler life cycles too...

  • Property svn:keywords set to Date Rev Author Id
File size: 67.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7824 2009-02-04 16:58:52Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "fdlimit.h"
30#include "handshake.h"
31#include "inout.h" /* tr_ioTestPiece */
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 400,
50   
51    /* how frequently to reallocate bandwidth */
52    BANDWIDTH_PERIOD_MSEC = 500,
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 30 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 16,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 32,
69
70    /* number of bad pieces a peer is allowed to send before we ban them */
71    MAX_BAD_PIECES_PER_PEER = 5,
72
73    /* use for bitwise operations w/peer_atom.myflags */
74    MYFLAG_BANNED = 1,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    /* unreachable for now... but not banned.
78     * if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2,
80
81    /* the minimum we'll wait before attempting to reconnect to a peer */
82    MINIMUM_RECONNECT_INTERVAL_SECS = 5
83};
84
85
86/**
87***
88**/
89
90enum
91{
92    UPLOAD_ONLY_UKNOWN,
93    UPLOAD_ONLY_YES,
94    UPLOAD_ONLY_NO
95};
96
97/**
98 * Peer information that should be kept even before we've connected and
99 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
100 * which ones would make good candidates for connecting to, and to watch out
101 * for banned peers.
102 *
103 * @see tr_peer
104 * @see tr_peermsgs
105 */
106struct peer_atom
107{
108    uint8_t     from;
109    uint8_t     flags;       /* these match the added_f flags */
110    uint8_t     myflags;     /* flags that aren't defined in added_f */
111    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
112    tr_port     port;
113    uint16_t    numFails;
114    tr_address  addr;
115    time_t      time;        /* when the peer's connection status last changed */
116    time_t      piece_data_time;
117};
118
119typedef struct tr_torrent_peers
120{
121    tr_bool         isRunning;
122
123    uint8_t         hash[SHA_DIGEST_LENGTH];
124    int         *   pendingRequestCount;
125    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
126    tr_ptrArray     pool; /* struct peer_atom */
127    tr_ptrArray     peers; /* tr_peer */
128    tr_ptrArray     webseeds; /* tr_webseed */
129    tr_torrent *    tor;
130    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
131
132    struct tr_peerMgr * manager;
133}
134Torrent;
135
136struct tr_peerMgr
137{
138    tr_session      * session;
139    tr_ptrArray       incomingHandshakes; /* tr_handshake */
140    tr_timer        * bandwidthTimer;
141    tr_timer        * rechokeTimer;
142    tr_timer        * refillTimer;
143    tr_timer        * reconnectTimer;
144};
145
146#define tordbg( t, ... ) \
147    do { \
148        if( tr_deepLoggingIsActive( ) ) \
149            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
150    } while( 0 )
151
152#define dbgmsg( ... ) \
153    do { \
154        if( tr_deepLoggingIsActive( ) ) \
155            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
156    } while( 0 )
157
158/**
159***
160**/
161
162static TR_INLINE void
163managerLock( const struct tr_peerMgr * manager )
164{
165    tr_globalLock( manager->session );
166}
167
168static TR_INLINE void
169managerUnlock( const struct tr_peerMgr * manager )
170{
171    tr_globalUnlock( manager->session );
172}
173
174static TR_INLINE void
175torrentLock( Torrent * torrent )
176{
177    managerLock( torrent->manager );
178}
179
180static TR_INLINE void
181torrentUnlock( Torrent * torrent )
182{
183    managerUnlock( torrent->manager );
184}
185
186static TR_INLINE int
187torrentIsLocked( const Torrent * t )
188{
189    return tr_globalIsLocked( t->manager->session );
190}
191
192/**
193***
194**/
195
196static int
197handshakeCompareToAddr( const void * va, const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a, const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray      * handshakes,
212                      const tr_address * addr )
213{
214    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
215}
216
217static int
218comparePeerAtomToAddress( const void * va, const void * vb )
219{
220    const struct peer_atom * a = va;
221
222    return tr_compareAddresses( &a->addr, vb );
223}
224
225static int
226comparePeerAtoms( const void * va, const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static Torrent*
238getExistingTorrent( tr_peerMgr *    manager,
239                    const uint8_t * hash )
240{
241    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
242
243    return tor == NULL ? NULL : tor->torrentPeers;
244}
245
246static int
247peerCompare( const void * va, const void * vb )
248{
249    const tr_peer * a = va;
250    const tr_peer * b = vb;
251
252    return tr_compareAddresses( &a->addr, &b->addr );
253}
254
255static int
256peerCompareToAddr( const void * va, const void * vb )
257{
258    const tr_peer * a = va;
259
260    return tr_compareAddresses( &a->addr, vb );
261}
262
263static tr_peer*
264getExistingPeer( Torrent          * torrent,
265                 const tr_address * addr )
266{
267    assert( torrentIsLocked( torrent ) );
268    assert( addr );
269
270    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
271}
272
273static struct peer_atom*
274getExistingAtom( const Torrent    * t,
275                 const tr_address * addr )
276{
277    Torrent * tt = (Torrent*)t;
278    assert( torrentIsLocked( t ) );
279    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
280}
281
282static tr_bool
283peerIsInUse( const Torrent    * ct,
284             const tr_address * addr )
285{
286    Torrent * t = (Torrent*) ct;
287
288    assert( torrentIsLocked ( t ) );
289
290    return getExistingPeer( t, addr )
291        || getExistingHandshake( &t->outgoingHandshakes, addr )
292        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
293}
294
295static tr_peer*
296peerConstructor( const tr_address * addr )
297{
298    tr_peer * p;
299    p = tr_new0( tr_peer, 1 );
300    p->addr = *addr;
301    return p;
302}
303
304static tr_peer*
305getPeer( Torrent          * torrent,
306         const tr_address * addr )
307{
308    tr_peer * peer;
309
310    assert( torrentIsLocked( torrent ) );
311
312    peer = getExistingPeer( torrent, addr );
313
314    if( peer == NULL )
315    {
316        peer = peerConstructor( addr );
317        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
318    }
319
320    return peer;
321}
322
323static void
324peerDestructor( tr_peer * peer )
325{
326    assert( peer );
327
328    if( peer->msgs != NULL )
329    {
330        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
331        tr_peerMsgsFree( peer->msgs );
332    }
333
334    tr_peerIoClear( peer->io );
335    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
336
337    tr_bitfieldFree( peer->have );
338    tr_bitfieldFree( peer->blame );
339    tr_free( peer->client );
340
341    tr_free( peer );
342}
343
344static void
345removePeer( Torrent * t,
346            tr_peer * peer )
347{
348    tr_peer *          removed;
349    struct peer_atom * atom;
350
351    assert( torrentIsLocked( t ) );
352
353    atom = getExistingAtom( t, &peer->addr );
354    assert( atom );
355    atom->time = time( NULL );
356
357    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
358    assert( removed == peer );
359    peerDestructor( removed );
360}
361
362static void
363removeAllPeers( Torrent * t )
364{
365    while( !tr_ptrArrayEmpty( &t->peers ) )
366        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
367}
368
369static void
370torrentDestructor( void * vt )
371{
372    Torrent * t = vt;
373    uint8_t   hash[SHA_DIGEST_LENGTH];
374
375    assert( t );
376    assert( !t->isRunning );
377    assert( torrentIsLocked( t ) );
378    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
379    assert( tr_ptrArrayEmpty( &t->peers ) );
380
381    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
382
383    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
384    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
385    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
386    tr_ptrArrayDestruct( &t->peers, NULL );
387
388    tr_free( t->pendingRequestCount );
389    tr_free( t );
390}
391
392static void peerCallbackFunc( void * vpeer,
393                              void * vevent,
394                              void * vt );
395
396static Torrent*
397torrentConstructor( tr_peerMgr * manager,
398                    tr_torrent * tor )
399{
400    int       i;
401    Torrent * t;
402
403    t = tr_new0( Torrent, 1 );
404    t->manager = manager;
405    t->tor = tor;
406    t->pool = TR_PTR_ARRAY_INIT;
407    t->peers = TR_PTR_ARRAY_INIT;
408    t->webseeds = TR_PTR_ARRAY_INIT;
409    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
410    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
411
412    for( i = 0; i < tor->info.webseedCount; ++i )
413    {
414        tr_webseed * w =
415            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
416        tr_ptrArrayAppend( &t->webseeds, w );
417    }
418
419    return t;
420}
421
422
423static int bandwidthPulse ( void * vmgr );
424static int rechokePulse   ( void * vmgr );
425static int refillPulse    ( void * vmgr );
426static int reconnectPulse ( void * vmgr );
427
428tr_peerMgr*
429tr_peerMgrNew( tr_session * session )
430{
431    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
432
433    m->session = session;
434    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
435    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
436    m->rechokeTimer   = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
437    m->refillTimer    = tr_timerNew( session, refillPulse,    m, REFILL_PERIOD_MSEC );
438    m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
439
440    rechokePulse( m );
441
442    return m;
443}
444
445void
446tr_peerMgrFree( tr_peerMgr * manager )
447{
448    managerLock( manager );
449
450    tr_timerFree( &manager->reconnectTimer );
451    tr_timerFree( &manager->refillTimer );
452    tr_timerFree( &manager->rechokeTimer );
453    tr_timerFree( &manager->bandwidthTimer );
454
455    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
456     * the item from manager->handshakes, so this is a little roundabout... */
457    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
458        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
459
460    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
461
462    managerUnlock( manager );
463    tr_free( manager );
464}
465
466static int
467clientIsDownloadingFrom( const tr_peer * peer )
468{
469    return peer->clientIsInterested && !peer->clientIsChoked;
470}
471
472static int
473clientIsUploadingTo( const tr_peer * peer )
474{
475    return peer->peerIsInterested && !peer->peerIsChoked;
476}
477
478/***
479****
480***/
481
482tr_bool
483tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
484                      const tr_address  * addr )
485{
486    tr_bool isSeed = FALSE;
487    const Torrent * t = tor->torrentPeers;
488    const struct peer_atom * atom = getExistingAtom( t, addr );
489
490    if( atom )
491        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
492
493    return isSeed;
494}
495
496/****
497*****
498*****  REFILL
499*****
500****/
501
502static void
503assertValidPiece( Torrent * t, tr_piece_index_t piece )
504{
505    assert( t );
506    assert( t->tor );
507    assert( piece < t->tor->info.pieceCount );
508}
509
510static int
511getPieceRequests( Torrent * t, tr_piece_index_t piece )
512{
513    assertValidPiece( t, piece );
514
515    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
516}
517
518static void
519incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
520{
521    assertValidPiece( t, piece );
522
523    if( t->pendingRequestCount == NULL )
524        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
525    t->pendingRequestCount[piece]++;
526}
527
528static void
529decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
530{
531    assertValidPiece( t, piece );
532
533    if( t->pendingRequestCount )
534        t->pendingRequestCount[piece]--;
535}
536
537struct tr_refill_piece
538{
539    tr_priority_t    priority;
540    uint32_t         piece;
541    uint32_t         peerCount;
542    int              random;
543    int              pendingRequestCount;
544    int              missingBlockCount;
545};
546
547static int
548compareRefillPiece( const void * aIn, const void * bIn )
549{
550    const struct tr_refill_piece * a = aIn;
551    const struct tr_refill_piece * b = bIn;
552
553    /* if one piece has a higher priority, it goes first */
554    if( a->priority != b->priority )
555        return a->priority > b->priority ? -1 : 1;
556
557    /* have a per-priority endgame */
558    if( a->pendingRequestCount != b->pendingRequestCount )
559        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
560
561    /* fewer missing pieces goes first */
562    if( a->missingBlockCount != b->missingBlockCount )
563        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
564
565    /* otherwise if one has fewer peers, it goes first */
566    if( a->peerCount != b->peerCount )
567        return a->peerCount < b->peerCount ? -1 : 1;
568
569    /* otherwise go with our random seed */
570    if( a->random != b->random )
571        return a->random < b->random ? -1 : 1;
572
573    return 0;
574}
575
576static tr_piece_index_t *
577getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
578{
579    const tr_torrent  * tor = t->tor;
580    const tr_info     * inf = &tor->info;
581    tr_piece_index_t    i;
582    tr_piece_index_t    poolSize = 0;
583    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
584    int                 peerCount;
585    const tr_peer    ** peers;
586
587    assert( torrentIsLocked( t ) );
588
589    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
590    peerCount = tr_ptrArraySize( &t->peers );
591
592    /* make a list of the pieces that we want but don't have */
593    for( i = 0; i < inf->pieceCount; ++i )
594        if( !tor->info.pieces[i].dnd
595                && !tr_cpPieceIsComplete( &tor->completion, i ) )
596            pool[poolSize++] = i;
597
598    /* sort the pool by which to request next */
599    if( poolSize > 1 )
600    {
601        tr_piece_index_t j;
602        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
603
604        for( j = 0; j < poolSize; ++j )
605        {
606            int k;
607            const tr_piece_index_t piece = pool[j];
608            struct tr_refill_piece * setme = p + j;
609
610            setme->piece = piece;
611            setme->priority = inf->pieces[piece].priority;
612            setme->peerCount = 0;
613            setme->random = tr_cryptoWeakRandInt( INT_MAX );
614            setme->pendingRequestCount = getPieceRequests( t, piece );
615            setme->missingBlockCount
616                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
617
618            for( k = 0; k < peerCount; ++k )
619            {
620                const tr_peer * peer = peers[k];
621                if( peer->peerIsInterested
622                        && !peer->clientIsChoked
623                        && tr_bitfieldHas( peer->have, piece ) )
624                    ++setme->peerCount;
625            }
626        }
627
628        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
629               compareRefillPiece );
630
631        for( j = 0; j < poolSize; ++j )
632            pool[j] = p[j].piece;
633
634        tr_free( p );
635    }
636
637    *pieceCount = poolSize;
638    return pool;
639}
640
641struct tr_blockIterator
642{
643    Torrent * t;
644    tr_block_index_t blockIndex, blockCount, *blocks;
645    tr_piece_index_t pieceIndex, pieceCount, *pieces;
646};
647
648static struct tr_blockIterator*
649blockIteratorNew( Torrent * t )
650{
651    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
652    i->t = t;
653    i->pieces = getPreferredPieces( t, &i->pieceCount );
654    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
655    return i;
656}
657
658static int
659blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
660{
661    int found;
662    Torrent * t = i->t;
663    tr_torrent * tor = t->tor;
664
665    while( ( i->blockIndex == i->blockCount )
666        && ( i->pieceIndex < i->pieceCount ) )
667    {
668        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
669        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
670        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
671        tr_block_index_t block;
672
673        assert( index < tor->info.pieceCount );
674
675        i->blockCount = 0;
676        i->blockIndex = 0;
677        for( block=b; block!=e; ++block )
678            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
679                i->blocks[i->blockCount++] = block;
680    }
681
682    assert( i->blockCount <= tor->blockCountInPiece );
683
684    if(( found = ( i->blockIndex < i->blockCount )))
685        *setme = i->blocks[i->blockIndex++];
686
687    return found;
688}
689
690static void
691blockIteratorFree( struct tr_blockIterator * i )
692{
693    tr_free( i->blocks );
694    tr_free( i->pieces );
695    tr_free( i );
696}
697
698static tr_peer**
699getPeersUploadingToClient( Torrent * t,
700                           int *     setmeCount )
701{
702    int j;
703    int peerCount = 0;
704    int retCount = 0;
705    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
706    tr_peer ** ret = tr_new( tr_peer *, peerCount );
707
708    j = 0; /* this is a temporary test to make sure we walk through all the peers */
709    if( peerCount )
710    {
711        /* Get a list of peers we're downloading from.
712           Pick a different starting point each time so all peers
713           get a chance at being the first in line */
714        const int fencepost = tr_cryptoWeakRandInt( peerCount );
715        int i = fencepost;
716        do {
717            if( clientIsDownloadingFrom( peers[i] ) )
718                ret[retCount++] = peers[i];
719            i = ( i + 1 ) % peerCount;
720            ++j;
721        } while( i != fencepost );
722    }
723    assert( j == peerCount );
724    *setmeCount = retCount;
725    return ret;
726}
727
728static uint32_t
729getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
730{
731    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
732    const uint64_t blockPos = tor->blockSize * b;
733    assert( blockPos >= piecePos );
734    return (uint32_t)( blockPos - piecePos );
735}
736
737static void
738refillTorrent( Torrent * t )
739{
740    tr_block_index_t block;
741    int peerCount;
742    int webseedCount;
743    tr_peer ** peers;
744    tr_webseed ** webseeds;
745    struct tr_blockIterator * blockIterator;
746    tr_torrent * tor = t->tor;
747
748    if( !t->isRunning )
749        return;
750    if( tr_torrentIsSeed( t->tor ) )
751        return;
752
753    tordbg( t, "Refilling Request Buffers..." );
754
755    blockIterator = blockIteratorNew( t );
756    peers = getPeersUploadingToClient( t, &peerCount );
757    webseedCount = tr_ptrArraySize( &t->webseeds );
758    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
759                          webseedCount * sizeof( tr_webseed* ) );
760
761    while( ( webseedCount || peerCount )
762        && blockIteratorNext( blockIterator, &block ) )
763    {
764        int j;
765        int handled = FALSE;
766
767        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
768        const uint32_t offset = getBlockOffsetInPiece( tor, block );
769        const uint32_t length = tr_torBlockCountBytes( tor, block );
770
771        /* find a peer who can ask for this block */
772        for( j=0; !handled && j<peerCount; )
773        {
774            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
775            switch( val )
776            {
777                case TR_ADDREQ_FULL:
778                case TR_ADDREQ_CLIENT_CHOKED:
779                    peers[j] = peers[--peerCount];
780                    break;
781
782                case TR_ADDREQ_MISSING:
783                case TR_ADDREQ_DUPLICATE:
784                    ++j;
785                    break;
786
787                case TR_ADDREQ_OK:
788                    incrementPieceRequests( t, index );
789                    handled = TRUE;
790                    break;
791
792                default:
793                    assert( 0 && "unhandled value" );
794                    break;
795            }
796        }
797
798        /* maybe one of the webseeds can do it */
799        for( j=0; !handled && j<webseedCount; )
800        {
801            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
802            switch( val )
803            {
804                case TR_ADDREQ_FULL:
805                    webseeds[j] = webseeds[--webseedCount];
806                    break;
807
808                case TR_ADDREQ_OK:
809                    incrementPieceRequests( t, index );
810                    handled = TRUE;
811                    break;
812
813                default:
814                    assert( 0 && "unhandled value" );
815                    break;
816            }
817        }
818    }
819
820    /* cleanup */
821    blockIteratorFree( blockIterator );
822    tr_free( webseeds );
823    tr_free( peers );
824}
825
826static int
827refillPulse( void * vmgr )
828{
829    tr_torrent * tor = NULL;
830    tr_peerMgr * mgr = vmgr;
831    managerLock( mgr );
832
833    while(( tor = tr_torrentNext( mgr->session, tor )))
834        if( tor->isRunning && !tr_torrentIsSeed( tor ) )
835            refillTorrent( tor->torrentPeers );
836
837    managerUnlock( mgr );
838    return TRUE;
839}
840
841static void
842broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
843{
844    size_t i;
845    size_t peerCount;
846    tr_peer ** peers;
847
848    assert( torrentIsLocked( t ) );
849
850    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
851
852    peerCount = tr_ptrArraySize( &t->peers );
853    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
854    for( i=0; i<peerCount; ++i )
855        if( peers[i]->msgs )
856            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
857}
858
859static void
860addStrike( Torrent * t,
861           tr_peer * peer )
862{
863    tordbg( t, "increasing peer %s strike count to %d",
864            tr_peerIoAddrStr( &peer->addr,
865                              peer->port ), peer->strikes + 1 );
866
867    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
868    {
869        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
870        atom->myflags |= MYFLAG_BANNED;
871        peer->doPurge = 1;
872        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
873    }
874}
875
876static void
877gotBadPiece( Torrent *        t,
878             tr_piece_index_t pieceIndex )
879{
880    tr_torrent *   tor = t->tor;
881    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
882
883    tor->corruptCur += byteCount;
884    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
885}
886
887static void
888peerSuggestedPiece( Torrent            * t UNUSED,
889                    tr_peer            * peer UNUSED,
890                    tr_piece_index_t     pieceIndex UNUSED,
891                    int                  isFastAllowed UNUSED )
892{
893#if 0
894    assert( t );
895    assert( peer );
896    assert( peer->msgs );
897
898    /* is this a valid piece? */
899    if(  pieceIndex >= t->tor->info.pieceCount )
900        return;
901
902    /* don't ask for it if we've already got it */
903    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
904        return;
905
906    /* don't ask for it if they don't have it */
907    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
908        return;
909
910    /* don't ask for it if we're choked and it's not fast */
911    if( !isFastAllowed && peer->clientIsChoked )
912        return;
913
914    /* request the blocks that we don't have in this piece */
915    {
916        tr_block_index_t block;
917        const tr_torrent * tor = t->tor;
918        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
919        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
920
921        for( block=start; block<end; ++block )
922        {
923            if( !tr_cpBlockIsComplete( tor->completion, block ) )
924            {
925                const uint32_t offset = getBlockOffsetInPiece( tor, block );
926                const uint32_t length = tr_torBlockCountBytes( tor, block );
927                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
928                incrementPieceRequests( t, pieceIndex );
929            }
930        }
931    }
932#endif
933}
934
935static void
936peerCallbackFunc( void * vpeer, void * vevent, void * vt )
937{
938    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
939    Torrent * t = vt;
940    const tr_peer_event * e = vevent;
941
942    torrentLock( t );
943
944    switch( e->eventType )
945    {
946        case TR_PEER_UPLOAD_ONLY:
947            /* update our atom */
948            if( peer ) {
949                struct peer_atom * a = getExistingAtom( t, &peer->addr );
950                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
951            }
952            break;
953
954        case TR_PEER_CANCEL:
955            decrementPieceRequests( t, e->pieceIndex );
956            break;
957
958        case TR_PEER_PEER_GOT_DATA:
959        {
960            const time_t now = time( NULL );
961            tr_torrent * tor = t->tor;
962
963            tor->activityDate = now;
964
965            if( e->wasPieceData )
966                tor->uploadedCur += e->length;
967
968            /* update the stats */
969            if( e->wasPieceData )
970                tr_statsAddUploaded( tor->session, e->length );
971
972            /* update our atom */
973            if( peer ) {
974                struct peer_atom * a = getExistingAtom( t, &peer->addr );
975                if( e->wasPieceData )
976                    a->piece_data_time = now;
977            }
978
979            break;
980        }
981
982        case TR_PEER_CLIENT_GOT_SUGGEST:
983            if( peer )
984                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
985            break;
986
987        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
988            if( peer )
989                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
990            break;
991
992        case TR_PEER_CLIENT_GOT_DATA:
993        {
994            const time_t now = time( NULL );
995            tr_torrent * tor = t->tor;
996            tor->activityDate = now;
997
998            /* only add this to downloadedCur if we got it from a peer --
999             * webseeds shouldn't count against our ratio.  As one tracker
1000             * admin put it, "Those pieces are downloaded directly from the
1001             * content distributor, not the peers, it is the tracker's job
1002             * to manage the swarms, not the web server and does not fit
1003             * into the jurisdiction of the tracker." */
1004            if( peer && e->wasPieceData )
1005                tor->downloadedCur += e->length;
1006
1007            /* update the stats */ 
1008            if( e->wasPieceData )
1009                tr_statsAddDownloaded( tor->session, e->length );
1010
1011            /* update our atom */
1012            if( peer ) {
1013                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1014                if( e->wasPieceData )
1015                    a->piece_data_time = now;
1016            }
1017
1018            break;
1019        }
1020
1021        case TR_PEER_PEER_PROGRESS:
1022        {
1023            if( peer )
1024            {
1025                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1026                const int peerIsSeed = e->progress >= 1.0;
1027                if( peerIsSeed ) {
1028                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1029                    atom->flags |= ADDED_F_SEED_FLAG;
1030                } else {
1031                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1032                    atom->flags &= ~ADDED_F_SEED_FLAG;
1033                }
1034            }
1035            break;
1036        }
1037
1038        case TR_PEER_CLIENT_GOT_BLOCK:
1039        {
1040            tr_torrent * tor = t->tor;
1041
1042            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1043
1044            tr_cpBlockAdd( &tor->completion, block );
1045            decrementPieceRequests( t, e->pieceIndex );
1046
1047            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1048
1049            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1050            {
1051                const tr_piece_index_t p = e->pieceIndex;
1052                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1053
1054                if( !ok )
1055                {
1056                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1057                               (unsigned long)p );
1058                }
1059
1060                tr_torrentSetHasPiece( tor, p, ok );
1061                tr_torrentSetPieceChecked( tor, p, TRUE );
1062                tr_peerMgrSetBlame( tor, p, ok );
1063
1064                if( !ok )
1065                {
1066                    gotBadPiece( t, p );
1067                }
1068                else
1069                {
1070                    int i;
1071                    int peerCount;
1072                    tr_peer ** peers;
1073                    tr_file_index_t fileIndex;
1074
1075                    peerCount = tr_ptrArraySize( &t->peers );
1076                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1077                    for( i=0; i<peerCount; ++i )
1078                        tr_peerMsgsHave( peers[i]->msgs, p );
1079
1080                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1081                    {
1082                        const tr_file * file = &tor->info.files[fileIndex];
1083                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1084                        {
1085                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1086                            tordbg( t, "closing recently-completed file \"%s\"", path );
1087                            tr_fdFileClose( path );
1088                            tr_free( path );
1089                        }
1090                    }
1091                }
1092
1093                tr_torrentRecheckCompleteness( tor );
1094            }
1095            break;
1096        }
1097
1098        case TR_PEER_ERROR:
1099            if( e->err == EINVAL )
1100            {
1101                addStrike( t, peer );
1102                peer->doPurge = 1;
1103                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1104            }
1105            else if( ( e->err == ERANGE )
1106                  || ( e->err == EMSGSIZE )
1107                  || ( e->err == ENOTCONN ) )
1108            {
1109                /* some protocol error from the peer */
1110                peer->doPurge = 1;
1111                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1112            }
1113            else /* a local error, such as an IO error */
1114            {
1115                t->tor->error = e->err;
1116                tr_strlcpy( t->tor->errorString,
1117                            tr_strerror( t->tor->error ),
1118                            sizeof( t->tor->errorString ) );
1119                tr_torrentStop( t->tor );
1120            }
1121            break;
1122
1123        default:
1124            assert( 0 );
1125    }
1126
1127    torrentUnlock( t );
1128}
1129
1130static void
1131ensureAtomExists( Torrent          * t,
1132                  const tr_address * addr,
1133                  tr_port            port,
1134                  uint8_t            flags,
1135                  uint8_t            from )
1136{
1137    if( getExistingAtom( t, addr ) == NULL )
1138    {
1139        struct peer_atom * a;
1140        a = tr_new0( struct peer_atom, 1 );
1141        a->addr = *addr;
1142        a->port = port;
1143        a->flags = flags;
1144        a->from = from;
1145        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1146        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1147    }
1148}
1149
1150static int
1151getMaxPeerCount( const tr_torrent * tor )
1152{
1153    return tor->maxConnectedPeers;
1154}
1155
1156static int
1157getPeerCount( const Torrent * t )
1158{
1159    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1160}
1161
1162/* FIXME: this is kind of a mess. */
1163static tr_bool
1164myHandshakeDoneCB( tr_handshake  * handshake,
1165                   tr_peerIo     * io,
1166                   tr_bool         isConnected,
1167                   const uint8_t * peer_id,
1168                   void          * vmanager )
1169{
1170    tr_bool            ok = isConnected;
1171    tr_bool            success = FALSE;
1172    tr_port            port;
1173    const tr_address * addr;
1174    tr_peerMgr       * manager = vmanager;
1175    Torrent          * t;
1176    tr_handshake     * ours;
1177
1178    assert( io );
1179    assert( tr_isBool( ok ) );
1180
1181    t = tr_peerIoHasTorrentHash( io )
1182        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1183        : NULL;
1184
1185    if( tr_peerIoIsIncoming ( io ) )
1186        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1187                                        handshake, handshakeCompare );
1188    else if( t )
1189        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1190                                        handshake, handshakeCompare );
1191    else
1192        ours = handshake;
1193
1194    assert( ours );
1195    assert( ours == handshake );
1196
1197    if( t )
1198        torrentLock( t );
1199
1200    addr = tr_peerIoGetAddress( io, &port );
1201
1202    if( !ok || !t || !t->isRunning )
1203    {
1204        if( t )
1205        {
1206            struct peer_atom * atom = getExistingAtom( t, addr );
1207            if( atom )
1208                ++atom->numFails;
1209        }
1210    }
1211    else /* looking good */
1212    {
1213        struct peer_atom * atom;
1214        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1215        atom = getExistingAtom( t, addr );
1216        atom->time = time( NULL );
1217        atom->piece_data_time = 0;
1218
1219        if( atom->myflags & MYFLAG_BANNED )
1220        {
1221            tordbg( t, "banned peer %s tried to reconnect",
1222                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1223        }
1224        else if( tr_peerIoIsIncoming( io )
1225               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1226
1227        {
1228        }
1229        else
1230        {
1231            tr_peer * peer = getExistingPeer( t, addr );
1232
1233            if( peer ) /* we already have this peer */
1234            {
1235            }
1236            else
1237            {
1238                peer = getPeer( t, addr );
1239                tr_free( peer->client );
1240
1241                if( !peer_id )
1242                    peer->client = NULL;
1243                else {
1244                    char client[128];
1245                    tr_clientForId( client, sizeof( client ), peer_id );
1246                    peer->client = tr_strdup( client );
1247                }
1248
1249                peer->port = port;
1250                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1251                                                                balanced by our unref in peerDestructor()  */
1252                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1253                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1254
1255                success = TRUE;
1256            }
1257        }
1258    }
1259
1260    if( t )
1261        torrentUnlock( t );
1262
1263    return success;
1264}
1265
1266void
1267tr_peerMgrAddIncoming( tr_peerMgr * manager,
1268                       tr_address * addr,
1269                       tr_port      port,
1270                       int          socket )
1271{
1272    managerLock( manager );
1273
1274    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1275    {
1276        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1277        tr_netClose( socket );
1278    }
1279    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1280    {
1281        tr_netClose( socket );
1282    }
1283    else /* we don't have a connetion to them yet... */
1284    {
1285        tr_peerIo *    io;
1286        tr_handshake * handshake;
1287
1288        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1289
1290        handshake = tr_handshakeNew( io,
1291                                     manager->session->encryptionMode,
1292                                     myHandshakeDoneCB,
1293                                     manager );
1294
1295        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1296
1297        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1298                                 handshakeCompare );
1299    }
1300
1301    managerUnlock( manager );
1302}
1303
1304static tr_bool
1305tr_isPex( const tr_pex * pex )
1306{
1307    return pex && tr_isAddress( &pex->addr );
1308}
1309
1310void
1311tr_peerMgrAddPex( tr_torrent   *  tor,
1312                  uint8_t         from,
1313                  const tr_pex *  pex )
1314{
1315    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1316    {
1317        Torrent * t = tor->torrentPeers;
1318        managerLock( t->manager );
1319
1320        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1321            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1322                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1323
1324        managerUnlock( t->manager );
1325    }
1326}
1327
1328tr_pex *
1329tr_peerMgrCompactToPex( const void *    compact,
1330                        size_t          compactLen,
1331                        const uint8_t * added_f,
1332                        size_t          added_f_len,
1333                        size_t *        pexCount )
1334{
1335    size_t          i;
1336    size_t          n = compactLen / 6;
1337    const uint8_t * walk = compact;
1338    tr_pex *        pex = tr_new0( tr_pex, n );
1339
1340    for( i = 0; i < n; ++i )
1341    {
1342        pex[i].addr.type = TR_AF_INET;
1343        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1344        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1345        if( added_f && ( n == added_f_len ) )
1346            pex[i].flags = added_f[i];
1347    }
1348
1349    *pexCount = n;
1350    return pex;
1351}
1352
1353tr_pex *
1354tr_peerMgrCompact6ToPex( const void    * compact,
1355                         size_t          compactLen,
1356                         const uint8_t * added_f,
1357                         size_t          added_f_len,
1358                         size_t        * pexCount )
1359{
1360    size_t          i;
1361    size_t          n = compactLen / 18;
1362    const uint8_t * walk = compact;
1363    tr_pex *        pex = tr_new0( tr_pex, n );
1364   
1365    for( i = 0; i < n; ++i )
1366    {
1367        pex[i].addr.type = TR_AF_INET6;
1368        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1369        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1370        if( added_f && ( n == added_f_len ) )
1371            pex[i].flags = added_f[i];
1372    }
1373   
1374    *pexCount = n;
1375    return pex;
1376}
1377
1378tr_pex *
1379tr_peerMgrArrayToPex( const void * array,
1380                      size_t       arrayLen,
1381                      size_t      * pexCount )
1382{
1383    size_t          i;
1384    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1385    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1386    const uint8_t * walk = array;
1387    tr_pex        * pex = tr_new0( tr_pex, n );
1388   
1389    for( i = 0 ; i < n ; i++ ) {
1390        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1391        tr_suspectAddress( &pex[i].addr, "tracker"  );
1392        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1393        pex[i].flags = 0x00;
1394        walk += sizeof( tr_address ) + 2;
1395    }
1396   
1397    *pexCount = n;
1398    return pex;
1399}
1400
1401/**
1402***
1403**/
1404
1405void
1406tr_peerMgrSetBlame( tr_torrent     * tor,
1407                    tr_piece_index_t pieceIndex,
1408                    int              success )
1409{
1410    if( !success )
1411    {
1412        int        peerCount, i;
1413        Torrent *  t = tor->torrentPeers;
1414        tr_peer ** peers;
1415
1416        assert( torrentIsLocked( t ) );
1417
1418        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1419        for( i = 0; i < peerCount; ++i )
1420        {
1421            tr_peer * peer = peers[i];
1422            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1423            {
1424                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1425                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1426                        pieceIndex, (int)peer->strikes + 1 );
1427                addStrike( t, peer );
1428            }
1429        }
1430    }
1431}
1432
1433int
1434tr_pexCompare( const void * va, const void * vb )
1435{
1436    const tr_pex * a = va;
1437    const tr_pex * b = vb;
1438    int i;
1439
1440    assert( tr_isPex( a ) );
1441    assert( tr_isPex( b ) );
1442
1443    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1444        return i;
1445
1446    if( a->port != b->port )
1447        return a->port < b->port ? -1 : 1;
1448
1449    return 0;
1450}
1451
1452static int
1453peerPrefersCrypto( const tr_peer * peer )
1454{
1455    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1456        return TRUE;
1457
1458    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1459        return FALSE;
1460
1461    return tr_peerIoIsEncrypted( peer->io );
1462}
1463
1464int
1465tr_peerMgrGetPeers( tr_torrent      * tor,
1466                    tr_pex         ** setme_pex,
1467                    uint8_t           af)
1468{
1469    int peersReturning = 0;
1470    const Torrent * t = tor->torrentPeers;
1471
1472    managerLock( t->manager );
1473
1474    {
1475        int i;
1476        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1477        const int peerCount = tr_ptrArraySize( &t->peers );
1478        /* for now, this will waste memory on torrents that have both
1479         * ipv6 and ipv4 peers */
1480        tr_pex * pex = tr_new( tr_pex, peerCount );
1481        tr_pex * walk = pex;
1482
1483        for( i=0; i<peerCount; ++i )
1484        {
1485            const tr_peer * peer = peers[i];
1486            if( peer->addr.type == af )
1487            {
1488                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1489
1490                assert( tr_isAddress( &peer->addr ) );
1491                walk->addr = peer->addr;
1492                walk->port = peer->port;
1493                walk->flags = 0;
1494                if( peerPrefersCrypto( peer ) )
1495                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1496                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1497                    walk->flags |= ADDED_F_SEED_FLAG;
1498                ++peersReturning;
1499                ++walk;
1500            }
1501        }
1502
1503        assert( ( walk - pex ) == peersReturning );
1504        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1505        *setme_pex = pex;
1506    }
1507
1508    managerUnlock( t->manager );
1509    return peersReturning;
1510}
1511
1512void
1513tr_peerMgrStartTorrent( tr_torrent * tor )
1514{
1515    tor->torrentPeers->isRunning = TRUE;
1516}
1517
1518static void
1519stopTorrent( Torrent * t )
1520{
1521    assert( torrentIsLocked( t ) );
1522
1523    t->isRunning = FALSE;
1524
1525    /* disconnect the peers. */
1526    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1527    tr_ptrArrayClear( &t->peers );
1528
1529    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1530     * which removes the handshake from t->outgoingHandshakes... */
1531    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1532        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1533}
1534
1535void
1536tr_peerMgrStopTorrent( tr_torrent * tor )
1537{
1538    Torrent * t = tor->torrentPeers;
1539
1540    managerLock( t->manager );
1541
1542    stopTorrent( t );
1543
1544    managerUnlock( t->manager );
1545}
1546
1547void
1548tr_peerMgrAddTorrent( tr_peerMgr * manager,
1549                      tr_torrent * tor )
1550{
1551    managerLock( manager );
1552
1553    assert( tor );
1554    assert( tor->torrentPeers == NULL );
1555
1556    tor->torrentPeers = torrentConstructor( manager, tor );
1557
1558    managerUnlock( manager );
1559}
1560
1561void
1562tr_peerMgrRemoveTorrent( tr_torrent * tor )
1563{
1564    tr_torrentLock( tor );
1565
1566    stopTorrent( tor->torrentPeers );
1567    torrentDestructor( tor->torrentPeers );
1568
1569    tr_torrentUnlock( tor );
1570}
1571
1572void
1573tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1574                               int8_t           * tab,
1575                               unsigned int       tabCount )
1576{
1577    tr_piece_index_t   i;
1578    const Torrent *    t;
1579    float              interval;
1580    tr_bool            isSeed;
1581    int                peerCount;
1582    const tr_peer **   peers;
1583    tr_torrentLock( tor );
1584
1585    t = tor->torrentPeers;
1586    tor = t->tor;
1587    interval = tor->info.pieceCount / (float)tabCount;
1588    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1589    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1590    peerCount = tr_ptrArraySize( &t->peers );
1591
1592    memset( tab, 0, tabCount );
1593
1594    for( i = 0; tor && i < tabCount; ++i )
1595    {
1596        const int piece = i * interval;
1597
1598        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1599            tab[i] = -1;
1600        else if( peerCount ) {
1601            int j;
1602            for( j = 0; j < peerCount; ++j )
1603                if( tr_bitfieldHas( peers[j]->have, i ) )
1604                    ++tab[i];
1605        }
1606    }
1607
1608    tr_torrentUnlock( tor );
1609}
1610
1611/* Returns the pieces that are available from peers */
1612tr_bitfield*
1613tr_peerMgrGetAvailable( const tr_torrent * tor )
1614{
1615    int i;
1616    int peerCount;
1617    Torrent * t = tor->torrentPeers;
1618    const tr_peer ** peers;
1619    tr_bitfield * pieces;
1620    managerLock( t->manager );
1621
1622    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1623    peerCount = tr_ptrArraySize( &t->peers );
1624    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1625    for( i=0; i<peerCount; ++i )
1626        tr_bitfieldOr( pieces, peers[i]->have );
1627
1628    managerUnlock( t->manager );
1629    return pieces;
1630}
1631
1632void
1633tr_peerMgrTorrentStats( tr_torrent       * tor,
1634                        int              * setmePeersKnown,
1635                        int              * setmePeersConnected,
1636                        int              * setmeSeedsConnected,
1637                        int              * setmeWebseedsSendingToUs,
1638                        int              * setmePeersSendingToUs,
1639                        int              * setmePeersGettingFromUs,
1640                        int              * setmePeersFrom )
1641{
1642    int i, size;
1643    const Torrent * t = tor->torrentPeers;
1644    const tr_peer ** peers;
1645    const tr_webseed ** webseeds;
1646
1647    managerLock( t->manager );
1648
1649    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1650    size = tr_ptrArraySize( &t->peers );
1651
1652    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1653    *setmePeersConnected       = 0;
1654    *setmeSeedsConnected       = 0;
1655    *setmePeersGettingFromUs   = 0;
1656    *setmePeersSendingToUs     = 0;
1657    *setmeWebseedsSendingToUs  = 0;
1658
1659    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1660        setmePeersFrom[i] = 0;
1661
1662    for( i=0; i<size; ++i )
1663    {
1664        const tr_peer * peer = peers[i];
1665        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1666
1667        if( peer->io == NULL ) /* not connected */
1668            continue;
1669
1670        ++*setmePeersConnected;
1671
1672        ++setmePeersFrom[atom->from];
1673
1674        if( clientIsDownloadingFrom( peer ) )
1675            ++*setmePeersSendingToUs;
1676
1677        if( clientIsUploadingTo( peer ) )
1678            ++*setmePeersGettingFromUs;
1679
1680        if( atom->flags & ADDED_F_SEED_FLAG )
1681            ++*setmeSeedsConnected;
1682    }
1683
1684    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1685    size = tr_ptrArraySize( &t->webseeds );
1686    for( i=0; i<size; ++i )
1687        if( tr_webseedIsActive( webseeds[i] ) )
1688            ++*setmeWebseedsSendingToUs;
1689
1690    managerUnlock( t->manager );
1691}
1692
1693float*
1694tr_peerMgrWebSpeeds( const tr_torrent * tor )
1695{
1696    const Torrent * t = tor->torrentPeers;
1697    const tr_webseed ** webseeds;
1698    int i;
1699    int webseedCount;
1700    float * ret;
1701    uint64_t now;
1702
1703    assert( t->manager );
1704    managerLock( t->manager );
1705
1706    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1707    webseedCount = tr_ptrArraySize( &t->webseeds );
1708    assert( webseedCount == tor->info.webseedCount );
1709    ret = tr_new0( float, webseedCount );
1710    now = tr_date( );
1711
1712    for( i=0; i<webseedCount; ++i )
1713        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1714            ret[i] = -1.0;
1715
1716    managerUnlock( t->manager );
1717    return ret;
1718}
1719
1720double
1721tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1722{
1723    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1724}
1725
1726
1727struct tr_peer_stat *
1728tr_peerMgrPeerStats( const tr_torrent    * tor,
1729                     int                 * setmeCount )
1730{
1731    int i, size;
1732    const Torrent * t = tor->torrentPeers;
1733    const tr_peer ** peers;
1734    tr_peer_stat * ret;
1735    uint64_t now;
1736
1737    assert( t->manager );
1738    managerLock( t->manager );
1739
1740    size = tr_ptrArraySize( &t->peers );
1741    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1742    ret = tr_new0( tr_peer_stat, size );
1743    now = tr_date( );
1744
1745    for( i = 0; i < size; ++i )
1746    {
1747        char *                   pch;
1748        const tr_peer *          peer = peers[i];
1749        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1750        tr_peer_stat *           stat = ret + i;
1751        tr_address               norm_addr;
1752
1753        norm_addr = peer->addr;
1754        tr_normalizeV4Mapped( &norm_addr );
1755        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1756        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1757                   sizeof( stat->client ) );
1758        stat->port               = ntohs( peer->port );
1759        stat->from               = atom->from;
1760        stat->progress           = peer->progress;
1761        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1762        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1763        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1764        stat->peerIsChoked       = peer->peerIsChoked;
1765        stat->peerIsInterested   = peer->peerIsInterested;
1766        stat->clientIsChoked     = peer->clientIsChoked;
1767        stat->clientIsInterested = peer->clientIsInterested;
1768        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1769        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1770        stat->isUploadingTo      = clientIsUploadingTo( peer );
1771        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1772
1773        pch = stat->flagStr;
1774        if( t->optimistic == peer ) *pch++ = 'O';
1775        if( stat->isDownloadingFrom ) *pch++ = 'D';
1776        else if( stat->clientIsInterested ) *pch++ = 'd';
1777        if( stat->isUploadingTo ) *pch++ = 'U';
1778        else if( stat->peerIsInterested ) *pch++ = 'u';
1779        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1780        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1781        if( stat->isEncrypted ) *pch++ = 'E';
1782        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1783        if( stat->isIncoming ) *pch++ = 'I';
1784        *pch = '\0';
1785    }
1786
1787    *setmeCount = size;
1788
1789    managerUnlock( t->manager );
1790    return ret;
1791}
1792
1793/**
1794***
1795**/
1796
1797struct ChokeData
1798{
1799    tr_bool         doUnchoke;
1800    tr_bool         isInterested;
1801    tr_bool         isChoked;
1802    int             rate;
1803    tr_peer *       peer;
1804};
1805
1806static int
1807compareChoke( const void * va,
1808              const void * vb )
1809{
1810    const struct ChokeData * a = va;
1811    const struct ChokeData * b = vb;
1812
1813    if( a->rate != b->rate ) /* prefer higher overall speeds */
1814        return a->rate > b->rate ? -1 : 1;
1815
1816    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1817        return a->isChoked ? 1 : -1;
1818
1819    return 0;
1820}
1821
1822static int
1823isNew( const tr_peer * peer )
1824{
1825    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1826}
1827
1828static int
1829isSame( const tr_peer * peer )
1830{
1831    return peer && peer->client && strstr( peer->client, "Transmission" );
1832}
1833
1834/**
1835***
1836**/
1837
1838static void
1839rechokeTorrent( Torrent * t )
1840{
1841    int i, size, unchokedInterested;
1842    const int peerCount = tr_ptrArraySize( &t->peers );
1843    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1844    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1845    const tr_session * session = t->manager->session;
1846    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1847    const uint64_t now = tr_date( );
1848
1849    assert( torrentIsLocked( t ) );
1850
1851    /* sort the peers by preference and rate */
1852    for( i = 0, size = 0; i < peerCount; ++i )
1853    {
1854        tr_peer * peer = peers[i];
1855        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1856
1857        if( peer->progress >= 1.0 ) /* choke all seeds */
1858        {
1859            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1860        }
1861        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1862        {
1863            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1864        }
1865        else if( chokeAll ) /* choke everyone if we're not uploading */
1866        {
1867            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1868        }
1869        else
1870        {
1871            struct ChokeData * n = &choke[size++];
1872            n->peer         = peer;
1873            n->isInterested = peer->peerIsInterested;
1874            n->isChoked     = peer->peerIsChoked;
1875            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1876        }
1877    }
1878
1879    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1880
1881    /**
1882     * Reciprocation and number of uploads capping is managed by unchoking
1883     * the N peers which have the best upload rate and are interested.
1884     * This maximizes the client's download rate. These N peers are
1885     * referred to as downloaders, because they are interested in downloading
1886     * from the client.
1887     *
1888     * Peers which have a better upload rate (as compared to the downloaders)
1889     * but aren't interested get unchoked. If they become interested, the
1890     * downloader with the worst upload rate gets choked. If a client has
1891     * a complete file, it uses its upload rate rather than its download
1892     * rate to decide which peers to unchoke.
1893     */
1894    unchokedInterested = 0;
1895    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1896        choke[i].doUnchoke = 1;
1897        if( choke[i].isInterested )
1898            ++unchokedInterested;
1899    }
1900
1901    /* optimistic unchoke */
1902    if( i < size )
1903    {
1904        int n;
1905        struct ChokeData * c;
1906        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1907
1908        for( ; i<size; ++i )
1909        {
1910            if( choke[i].isInterested )
1911            {
1912                const tr_peer * peer = choke[i].peer;
1913                int x = 1, y;
1914                if( isNew( peer ) ) x *= 3;
1915                if( isSame( peer ) ) x *= 3;
1916                for( y=0; y<x; ++y )
1917                    tr_ptrArrayAppend( &randPool, &choke[i] );
1918            }
1919        }
1920
1921        if(( n = tr_ptrArraySize( &randPool )))
1922        {
1923            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
1924            c->doUnchoke = 1;
1925            t->optimistic = c->peer;
1926        }
1927
1928        tr_ptrArrayDestruct( &randPool, NULL );
1929    }
1930
1931    for( i=0; i<size; ++i )
1932        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1933
1934    /* cleanup */
1935    tr_free( choke );
1936}
1937
1938static int
1939rechokePulse( void * vmgr )
1940{
1941    tr_torrent * tor = NULL;
1942    tr_peerMgr * mgr = vmgr;
1943    managerLock( mgr );
1944
1945    while(( tor = tr_torrentNext( mgr->session, tor )))
1946        if( tor->isRunning )
1947            rechokeTorrent( tor->torrentPeers );
1948
1949    managerUnlock( mgr );
1950    return TRUE;
1951}
1952
1953/***
1954****
1955****  Life and Death
1956****
1957***/
1958
1959typedef enum
1960{
1961    TR_CAN_KEEP,
1962    TR_CAN_CLOSE,
1963    TR_MUST_CLOSE,
1964}
1965tr_close_type_t;
1966
1967static tr_close_type_t
1968shouldPeerBeClosed( const Torrent    * t,
1969                    const tr_peer    * peer,
1970                    int                peerCount )
1971{
1972    const tr_torrent *       tor = t->tor;
1973    const time_t             now = time( NULL );
1974    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1975
1976    /* if it's marked for purging, close it */
1977    if( peer->doPurge )
1978    {
1979        tordbg( t, "purging peer %s because its doPurge flag is set",
1980                tr_peerIoAddrStr( &atom->addr, atom->port ) );
1981        return TR_MUST_CLOSE;
1982    }
1983
1984    /* if we're seeding and the peer has everything we have,
1985     * and enough time has passed for a pex exchange, then disconnect */
1986    if( tr_torrentIsSeed( tor ) )
1987    {
1988        int peerHasEverything;
1989        if( atom->flags & ADDED_F_SEED_FLAG )
1990            peerHasEverything = TRUE;
1991        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
1992            peerHasEverything = FALSE;
1993        else {
1994            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
1995            tr_bitfieldDifference( tmp, peer->have );
1996            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1997            tr_bitfieldFree( tmp );
1998        }
1999
2000        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2001        {
2002            tordbg( t, "purging peer %s because we're both seeds",
2003                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2004            return TR_MUST_CLOSE;
2005        }
2006    }
2007
2008    /* disconnect if it's been too long since piece data has been transferred.
2009     * this is on a sliding scale based on number of available peers... */
2010    {
2011        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2012        /* if we have >= relaxIfFewerThan, strictness is 100%.
2013         * if we have zero connections, strictness is 0% */
2014        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2015                               ? 1.0
2016                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2017        const int lo = MIN_UPLOAD_IDLE_SECS;
2018        const int hi = MAX_UPLOAD_IDLE_SECS;
2019        const int limit = hi - ( ( hi - lo ) * strictness );
2020        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2021/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2022        if( idleTime > limit ) {
2023            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2024                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2025            return TR_CAN_CLOSE;
2026        }
2027    }
2028
2029    return TR_CAN_KEEP;
2030}
2031
2032static tr_peer **
2033getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2034{
2035    int i, peerCount, outsize;
2036    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2037    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2038
2039    assert( torrentIsLocked( t ) );
2040
2041    for( i = outsize = 0; i < peerCount; ++i )
2042        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2043            ret[outsize++] = peers[i];
2044
2045    *setmeSize = outsize;
2046    return ret;
2047}
2048
2049static int
2050compareCandidates( const void * va,
2051                   const void * vb )
2052{
2053    const struct peer_atom * a = *(const struct peer_atom**) va;
2054    const struct peer_atom * b = *(const struct peer_atom**) vb;
2055
2056    /* <Charles> Here we would probably want to try reconnecting to
2057     * peers that had most recently given us data. Lots of users have
2058     * trouble with resets due to their routers and/or ISPs. This way we
2059     * can quickly recover from an unwanted reset. So we sort
2060     * piece_data_time in descending order.
2061     */
2062
2063    if( a->piece_data_time != b->piece_data_time )
2064        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2065
2066    if( a->numFails != b->numFails )
2067        return a->numFails < b->numFails ? -1 : 1;
2068
2069    if( a->time != b->time )
2070        return a->time < b->time ? -1 : 1;
2071
2072    /* all other things being equal, prefer peers whose
2073     * information comes from a more reliable source */
2074    if( a->from != b->from )
2075        return a->from < b->from ? -1 : 1;
2076
2077    return 0;
2078}
2079
2080static int
2081getReconnectIntervalSecs( const struct peer_atom * atom )
2082{
2083    int          sec;
2084    const time_t now = time( NULL );
2085
2086    /* if we were recently connected to this peer and transferring piece
2087     * data, try to reconnect to them sooner rather that later -- we don't
2088     * want network troubles to get in the way of a good peer. */
2089    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2090        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2091
2092    /* don't allow reconnects more often than our minimum */
2093    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2094        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2095
2096    /* otherwise, the interval depends on how many times we've tried
2097     * and failed to connect to the peer */
2098    else switch( atom->numFails ) {
2099        case 0: sec = 0; break;
2100        case 1: sec = 5; break;
2101        case 2: sec = 2 * 60; break;
2102        case 3: sec = 15 * 60; break;
2103        case 4: sec = 30 * 60; break;
2104        case 5: sec = 60 * 60; break;
2105        default: sec = 120 * 60; break;
2106    }
2107
2108    return sec;
2109}
2110
2111static struct peer_atom **
2112getPeerCandidates( Torrent * t, int * setmeSize )
2113{
2114    int                 i, atomCount, retCount;
2115    struct peer_atom ** atoms;
2116    struct peer_atom ** ret;
2117    const time_t        now = time( NULL );
2118    const int           seed = tr_torrentIsSeed( t->tor );
2119
2120    assert( torrentIsLocked( t ) );
2121
2122    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2123    ret = tr_new( struct peer_atom*, atomCount );
2124    for( i = retCount = 0; i < atomCount; ++i )
2125    {
2126        int                interval;
2127        struct peer_atom * atom = atoms[i];
2128
2129        /* peer fed us too much bad data ... we only keep it around
2130         * now to weed it out in case someone sends it to us via pex */
2131        if( atom->myflags & MYFLAG_BANNED )
2132            continue;
2133
2134        /* peer was unconnectable before, so we're not going to keep trying.
2135         * this is needs a separate flag from `banned', since if they try
2136         * to connect to us later, we'll let them in */
2137        if( atom->myflags & MYFLAG_UNREACHABLE )
2138            continue;
2139
2140        /* we don't need two connections to the same peer... */
2141        if( peerIsInUse( t, &atom->addr ) )
2142            continue;
2143
2144        /* no need to connect if we're both seeds... */
2145        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2146                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2147            continue;
2148
2149        /* don't reconnect too often */
2150        interval = getReconnectIntervalSecs( atom );
2151        if( ( now - atom->time ) < interval )
2152        {
2153            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2154                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2155            continue;
2156        }
2157
2158        /* Don't connect to peers in our blocklist */
2159        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2160            continue;
2161
2162        ret[retCount++] = atom;
2163    }
2164
2165    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2166    *setmeSize = retCount;
2167    return ret;
2168}
2169
2170static void
2171closePeer( Torrent * t, tr_peer * peer )
2172{
2173    struct peer_atom * atom;
2174
2175    assert( t != NULL );
2176    assert( peer != NULL );
2177
2178    /* if we transferred piece data, then they might be good peers,
2179       so reset their `numFails' weight to zero.  otherwise we connected
2180       to them fruitlessly, so mark it as another fail */
2181    atom = getExistingAtom( t, &peer->addr );
2182    if( atom->piece_data_time )
2183        atom->numFails = 0;
2184    else
2185        ++atom->numFails;
2186
2187    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2188    removePeer( t, peer );
2189}
2190
2191static void
2192reconnectTorrent( Torrent * t )
2193{
2194    static time_t prevTime = 0;
2195    static int    newConnectionsThisSecond = 0;
2196    time_t        now;
2197
2198    now = time( NULL );
2199    if( prevTime != now )
2200    {
2201        prevTime = now;
2202        newConnectionsThisSecond = 0;
2203    }
2204
2205    if( !t->isRunning )
2206    {
2207        removeAllPeers( t );
2208    }
2209    else
2210    {
2211        int i;
2212        int canCloseCount;
2213        int mustCloseCount;
2214        int candidateCount;
2215        int maxCandidates;
2216        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2217        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2218        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2219
2220        tordbg( t, "reconnect pulse for [%s]: "
2221                   "%d must-close connections, "
2222                   "%d can-close connections, "
2223                   "%d connection candidates, "
2224                   "%d atoms, "
2225                   "max per pulse is %d",
2226                   t->tor->info.name,
2227                   mustCloseCount,
2228                   canCloseCount,
2229                   candidateCount,
2230                   tr_ptrArraySize( &t->pool ),
2231                   MAX_RECONNECTIONS_PER_PULSE );
2232
2233        /* disconnect the really bad peers */
2234        for( i=0; i<mustCloseCount; ++i )
2235            closePeer( t, mustClose[i] );
2236
2237        /* decide how many peers can we try to add in this pass */
2238        maxCandidates = candidateCount;
2239        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2240        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2241        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2242
2243        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2244        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2245            closePeer( t, canClose[i] );
2246
2247        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2248                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2249                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2250                   candidateCount,
2251                   MAX_RECONNECTIONS_PER_PULSE,
2252                   getPeerCount( t ),
2253                   getMaxPeerCount( t->tor ),
2254                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2255
2256        /* add some new ones */
2257        for( i=0; i<maxCandidates; ++i )
2258        {
2259            tr_peerMgr        * mgr = t->manager;
2260            struct peer_atom  * atom = candidates[i];
2261            tr_peerIo         * io;
2262
2263            tordbg( t, "Starting an OUTGOING connection with %s",
2264                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2265
2266            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2267
2268            if( io == NULL )
2269            {
2270                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2271                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2272                atom->myflags |= MYFLAG_UNREACHABLE;
2273            }
2274            else
2275            {
2276                tr_handshake * handshake = tr_handshakeNew( io,
2277                                                            mgr->session->encryptionMode,
2278                                                            myHandshakeDoneCB,
2279                                                            mgr );
2280
2281                assert( tr_peerIoGetTorrentHash( io ) );
2282
2283                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2284
2285                ++newConnectionsThisSecond;
2286
2287                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2288                                         handshakeCompare );
2289            }
2290
2291            atom->time = time( NULL );
2292        }
2293
2294        /* cleanup */
2295        tr_free( candidates );
2296        tr_free( mustClose );
2297        tr_free( canClose );
2298    }
2299}
2300
2301static int
2302reconnectPulse( void * vmgr )
2303{
2304    tr_torrent * tor = NULL;
2305    tr_peerMgr * mgr = vmgr;
2306    managerLock( mgr );
2307
2308    while(( tor = tr_torrentNext( mgr->session, tor )))
2309        if( tor->isRunning )
2310            reconnectTorrent( tor->torrentPeers );
2311
2312    managerUnlock( mgr );
2313    return TRUE;
2314}
2315
2316/****
2317*****
2318*****  BANDWIDTH ALLOCATION
2319*****
2320****/
2321
2322static void
2323pumpAllPeers( tr_peerMgr * mgr )
2324{
2325    tr_torrent * tor = NULL;
2326
2327    while(( tor = tr_torrentNext( mgr->session, tor )))
2328    {
2329        int j;
2330        Torrent * t = tor->torrentPeers;
2331
2332        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2333        {
2334            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2335            tr_peerMsgsPulse( peer->msgs );
2336        }
2337    }
2338}
2339
2340static int
2341bandwidthPulse( void * vmgr )
2342{
2343    tr_peerMgr * mgr = vmgr;
2344    managerLock( mgr );
2345
2346    /* FIXME: this next line probably isn't necessary... */
2347    pumpAllPeers( mgr );
2348
2349    /* allocate bandwidth to the peers */
2350    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2351    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2352
2353    managerUnlock( mgr );
2354    return TRUE;
2355}
Note: See TracBrowser for help on using the repository browser.