source: trunk/libtransmission/peer-mgr.c @ 7591

Last change on this file since 7591 was 7591, checked in by charles, 12 years ago

(trunk libT) inline a few more torrent methods

  • Property svn:keywords set to Date Rev Author Id
File size: 67.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7591 2009-01-03 00:25:27Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 4,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 8,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_ptrArray       finishedHandshakes; /* tr_handshake */
148    tr_timer        * bandwidthTimer;
149};
150
151#define tordbg( t, ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
155    } while( 0 )
156
157#define dbgmsg( ... ) \
158    do { \
159        if( tr_deepLoggingIsActive( ) ) \
160            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
161    } while( 0 )
162
163/**
164***
165**/
166
167static inline void
168managerLock( const struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->session );
171}
172
173static inline void
174managerUnlock( const struct tr_peerMgr * manager )
175{
176    tr_globalUnlock( manager->session );
177}
178
179static inline void
180torrentLock( Torrent * torrent )
181{
182    managerLock( torrent->manager );
183}
184
185static inline void
186torrentUnlock( Torrent * torrent )
187{
188    managerUnlock( torrent->manager );
189}
190
191static inline int
192torrentIsLocked( const Torrent * t )
193{
194    return tr_globalIsLocked( t->manager->session );
195}
196
197/**
198***
199**/
200
201static int
202handshakeCompareToAddr( const void * va, const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a, const void * b )
211{
212    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
213}
214
215static tr_handshake*
216getExistingHandshake( tr_ptrArray      * handshakes,
217                      const tr_address * addr )
218{
219    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va, const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return tr_compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va, const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va, const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return tr_compareAddresses( &a->addr, &b->addr );
278}
279
280static int
281peerCompareToAddr( const void * va, const void * vb )
282{
283    const tr_peer * a = va;
284
285    return tr_compareAddresses( &a->addr, vb );
286}
287
288static tr_peer*
289getExistingPeer( Torrent          * torrent,
290                 const tr_address * addr )
291{
292    assert( torrentIsLocked( torrent ) );
293    assert( addr );
294
295    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
296}
297
298static struct peer_atom*
299getExistingAtom( const Torrent    * t,
300                 const tr_address * addr )
301{
302    Torrent * tt = (Torrent*)t;
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( &t->outgoingHandshakes, addr )
317        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    return p;
327}
328
329static tr_peer*
330getPeer( Torrent          * torrent,
331         const tr_address * addr )
332{
333    tr_peer * peer;
334
335    assert( torrentIsLocked( torrent ) );
336
337    peer = getExistingPeer( torrent, addr );
338
339    if( peer == NULL )
340    {
341        peer = peerConstructor( addr );
342        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
343    }
344
345    return peer;
346}
347
348static void
349peerDestructor( tr_peer * peer )
350{
351    assert( peer );
352
353    if( peer->msgs != NULL )
354    {
355        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
356        tr_peerMsgsFree( peer->msgs );
357    }
358
359    tr_peerIoFree( peer->io );
360
361    tr_bitfieldFree( peer->have );
362    tr_bitfieldFree( peer->blame );
363    tr_free( peer->client );
364
365    tr_free( peer );
366}
367
368static void
369removePeer( Torrent * t,
370            tr_peer * peer )
371{
372    tr_peer *          removed;
373    struct peer_atom * atom;
374
375    assert( torrentIsLocked( t ) );
376
377    atom = getExistingAtom( t, &peer->addr );
378    assert( atom );
379    atom->time = time( NULL );
380
381    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
382    assert( removed == peer );
383    peerDestructor( removed );
384}
385
386static void
387removeAllPeers( Torrent * t )
388{
389    while( !tr_ptrArrayEmpty( &t->peers ) )
390        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
391}
392
393static void
394torrentDestructor( void * vt )
395{
396    Torrent * t = vt;
397    uint8_t   hash[SHA_DIGEST_LENGTH];
398
399    assert( t );
400    assert( !t->isRunning );
401    assert( torrentIsLocked( t ) );
402    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
403    assert( tr_ptrArrayEmpty( &t->peers ) );
404
405    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
406
407    tr_timerFree( &t->reconnectTimer );
408    tr_timerFree( &t->rechokeTimer );
409    tr_timerFree( &t->refillTimer );
410
411    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
412    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
413    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
414    tr_ptrArrayDestruct( &t->peers, NULL );
415
416    tr_free( t->pendingRequestCount );
417    tr_free( t );
418}
419
420static void peerCallbackFunc( void * vpeer,
421                              void * vevent,
422                              void * vt );
423
424static Torrent*
425torrentConstructor( tr_peerMgr * manager,
426                    tr_torrent * tor )
427{
428    int       i;
429    Torrent * t;
430
431    t = tr_new0( Torrent, 1 );
432    t->manager = manager;
433    t->tor = tor;
434    t->pool = TR_PTR_ARRAY_INIT;
435    t->peers = TR_PTR_ARRAY_INIT;
436    t->webseeds = TR_PTR_ARRAY_INIT;
437    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
438    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
439
440    for( i = 0; i < tor->info.webseedCount; ++i )
441    {
442        tr_webseed * w =
443            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
444        tr_ptrArrayAppend( &t->webseeds, w );
445    }
446
447    return t;
448}
449
450
451static int bandwidthPulse( void * vmgr );
452
453
454tr_peerMgr*
455tr_peerMgrNew( tr_session * session )
456{
457    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
458
459    m->session = session;
460    m->torrents = TR_PTR_ARRAY_INIT;
461    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
462    m->finishedHandshakes = TR_PTR_ARRAY_INIT;
463    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
464    return m;
465}
466
467void
468tr_peerMgrFree( tr_peerMgr * manager )
469{
470    tr_handshake * handshake;
471
472    managerLock( manager );
473
474    tr_timerFree( &manager->bandwidthTimer );
475
476    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
477     * the item from manager->handshakes, so this is a little roundabout... */
478    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
479        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
480
481    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
482
483    while(( handshake = tr_ptrArrayPop( &manager->finishedHandshakes )))
484        tr_handshakeFree( handshake );
485
486    tr_ptrArrayDestruct( &manager->finishedHandshakes, NULL );
487
488    /* free the torrents. */
489    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
490
491    managerUnlock( manager );
492    tr_free( manager );
493}
494
495static int
496clientIsDownloadingFrom( const tr_peer * peer )
497{
498    return peer->clientIsInterested && !peer->clientIsChoked;
499}
500
501static int
502clientIsUploadingTo( const tr_peer * peer )
503{
504    return peer->peerIsInterested && !peer->peerIsChoked;
505}
506
507/***
508****
509***/
510
511tr_bool
512tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
513                      const uint8_t     * torrentHash,
514                      const tr_address  * addr )
515{
516    tr_bool isSeed = FALSE;
517    const Torrent * t = NULL;
518    const struct peer_atom * atom = NULL;
519
520    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
521    if( t )
522        atom = getExistingAtom( t, addr );
523    if( atom )
524        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
525
526    return isSeed;
527}
528
529/****
530*****
531*****  REFILL
532*****
533****/
534
535static void
536assertValidPiece( Torrent * t, tr_piece_index_t piece )
537{
538    assert( t );
539    assert( t->tor );
540    assert( piece < t->tor->info.pieceCount );
541}
542
543static int
544getPieceRequests( Torrent * t, tr_piece_index_t piece )
545{
546    assertValidPiece( t, piece );
547
548    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
549}
550
551static void
552incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
553{
554    assertValidPiece( t, piece );
555
556    if( t->pendingRequestCount == NULL )
557        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
558    t->pendingRequestCount[piece]++;
559}
560
561static void
562decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
563{
564    assertValidPiece( t, piece );
565
566    if( t->pendingRequestCount )
567        t->pendingRequestCount[piece]--;
568}
569
570struct tr_refill_piece
571{
572    tr_priority_t    priority;
573    uint32_t         piece;
574    uint32_t         peerCount;
575    int              random;
576    int              pendingRequestCount;
577    int              missingBlockCount;
578};
579
580static int
581compareRefillPiece( const void * aIn, const void * bIn )
582{
583    const struct tr_refill_piece * a = aIn;
584    const struct tr_refill_piece * b = bIn;
585
586    /* if one piece has a higher priority, it goes first */
587    if( a->priority != b->priority )
588        return a->priority > b->priority ? -1 : 1;
589
590    /* have a per-priority endgame */
591    if( a->pendingRequestCount != b->pendingRequestCount )
592        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
593
594    /* fewer missing pieces goes first */
595    if( a->missingBlockCount != b->missingBlockCount )
596        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
597
598    /* otherwise if one has fewer peers, it goes first */
599    if( a->peerCount != b->peerCount )
600        return a->peerCount < b->peerCount ? -1 : 1;
601
602    /* otherwise go with our random seed */
603    if( a->random != b->random )
604        return a->random < b->random ? -1 : 1;
605
606    return 0;
607}
608
609static tr_piece_index_t *
610getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
611{
612    const tr_torrent  * tor = t->tor;
613    const tr_info     * inf = &tor->info;
614    tr_piece_index_t    i;
615    tr_piece_index_t    poolSize = 0;
616    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
617    int                 peerCount;
618    const tr_peer    ** peers;
619
620    assert( torrentIsLocked( t ) );
621
622    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
623    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
624
625    /* make a list of the pieces that we want but don't have */
626    for( i = 0; i < inf->pieceCount; ++i )
627        if( !tor->info.pieces[i].dnd
628                && !tr_cpPieceIsComplete( &tor->completion, i ) )
629            pool[poolSize++] = i;
630
631    /* sort the pool by which to request next */
632    if( poolSize > 1 )
633    {
634        tr_piece_index_t j;
635        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
636
637        for( j = 0; j < poolSize; ++j )
638        {
639            int k;
640            const tr_piece_index_t piece = pool[j];
641            struct tr_refill_piece * setme = p + j;
642
643            setme->piece = piece;
644            setme->priority = inf->pieces[piece].priority;
645            setme->peerCount = 0;
646            setme->random = tr_cryptoWeakRandInt( INT_MAX );
647            setme->pendingRequestCount = getPieceRequests( t, piece );
648            setme->missingBlockCount
649                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
650
651            for( k = 0; k < peerCount; ++k )
652            {
653                const tr_peer * peer = peers[k];
654                if( peer->peerIsInterested
655                        && !peer->clientIsChoked
656                        && tr_bitfieldHas( peer->have, piece ) )
657                    ++setme->peerCount;
658            }
659        }
660
661        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
662               compareRefillPiece );
663
664        for( j = 0; j < poolSize; ++j )
665            pool[j] = p[j].piece;
666
667        tr_free( p );
668    }
669
670    *pieceCount = poolSize;
671    return pool;
672}
673
674struct tr_blockIterator
675{
676    Torrent * t;
677    tr_block_index_t blockIndex, blockCount, *blocks;
678    tr_piece_index_t pieceIndex, pieceCount, *pieces;
679};
680
681static struct tr_blockIterator*
682blockIteratorNew( Torrent * t )
683{
684    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
685    i->t = t;
686    i->pieces = getPreferredPieces( t, &i->pieceCount );
687    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
688    return i;
689}
690
691static int
692blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
693{
694    int found;
695    Torrent * t = i->t;
696    tr_torrent * tor = t->tor;
697
698    while( ( i->blockIndex == i->blockCount )
699        && ( i->pieceIndex < i->pieceCount ) )
700    {
701        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
702        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
703        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
704        tr_block_index_t block;
705
706        assert( index < tor->info.pieceCount );
707
708        i->blockCount = 0;
709        i->blockIndex = 0;
710        for( block=b; block!=e; ++block )
711            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
712                i->blocks[i->blockCount++] = block;
713    }
714
715    assert( i->blockCount <= tor->blockCountInPiece );
716
717    if(( found = ( i->blockIndex < i->blockCount )))
718        *setme = i->blocks[i->blockIndex++];
719
720    return found;
721}
722
723static void
724blockIteratorFree( struct tr_blockIterator * i )
725{
726    tr_free( i->blocks );
727    tr_free( i->pieces );
728    tr_free( i );
729}
730
731static tr_peer**
732getPeersUploadingToClient( Torrent * t,
733                           int *     setmeCount )
734{
735    int j;
736    int peerCount = 0;
737    int retCount = 0;
738    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
739    tr_peer ** ret = tr_new( tr_peer *, peerCount );
740
741    j = 0; /* this is a temporary test to make sure we walk through all the peers */
742    if( peerCount )
743    {
744        /* Get a list of peers we're downloading from.
745           Pick a different starting point each time so all peers
746           get a chance at being the first in line */
747        const int fencepost = tr_cryptoWeakRandInt( peerCount );
748        int i = fencepost;
749        do {
750            if( clientIsDownloadingFrom( peers[i] ) )
751                ret[retCount++] = peers[i];
752            i = ( i + 1 ) % peerCount;
753            ++j;
754        } while( i != fencepost );
755    }
756    assert( j == peerCount );
757    *setmeCount = retCount;
758    return ret;
759}
760
761static uint32_t
762getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
763{
764    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
765    const uint64_t blockPos = tor->blockSize * b;
766    assert( blockPos >= piecePos );
767    return (uint32_t)( blockPos - piecePos );
768}
769
770static int
771refillPulse( void * vtorrent )
772{
773    tr_block_index_t block;
774    int peerCount;
775    int webseedCount;
776    tr_peer ** peers;
777    tr_webseed ** webseeds;
778    struct tr_blockIterator * blockIterator;
779    Torrent * t = vtorrent;
780    tr_torrent * tor = t->tor;
781
782    if( !t->isRunning )
783        return TRUE;
784    if( tr_torrentIsSeed( t->tor ) )
785        return TRUE;
786
787    torrentLock( t );
788    tordbg( t, "Refilling Request Buffers..." );
789
790    blockIterator = blockIteratorNew( t );
791    peers = getPeersUploadingToClient( t, &peerCount );
792    webseedCount = tr_ptrArraySize( &t->webseeds );
793    webseeds = tr_memdup( TR_PTR_ARRAY_DATA( &t->webseeds ),
794                          webseedCount * sizeof( tr_webseed* ) );
795
796    while( ( webseedCount || peerCount )
797        && blockIteratorNext( blockIterator, &block ) )
798    {
799        int j;
800        int handled = FALSE;
801
802        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
803        const uint32_t offset = getBlockOffsetInPiece( tor, block );
804        const uint32_t length = tr_torBlockCountBytes( tor, block );
805
806        /* find a peer who can ask for this block */
807        for( j=0; !handled && j<peerCount; )
808        {
809            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
810            switch( val )
811            {
812                case TR_ADDREQ_FULL:
813                case TR_ADDREQ_CLIENT_CHOKED:
814                    peers[j] = peers[--peerCount];
815                    break;
816
817                case TR_ADDREQ_MISSING:
818                case TR_ADDREQ_DUPLICATE:
819                    ++j;
820                    break;
821
822                case TR_ADDREQ_OK:
823                    incrementPieceRequests( t, index );
824                    handled = TRUE;
825                    break;
826
827                default:
828                    assert( 0 && "unhandled value" );
829                    break;
830            }
831        }
832
833        /* maybe one of the webseeds can do it */
834        for( j=0; !handled && j<webseedCount; )
835        {
836            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
837            switch( val )
838            {
839                case TR_ADDREQ_FULL:
840                    webseeds[j] = webseeds[--webseedCount];
841                    break;
842
843                case TR_ADDREQ_OK:
844                    incrementPieceRequests( t, index );
845                    handled = TRUE;
846                    break;
847
848                default:
849                    assert( 0 && "unhandled value" );
850                    break;
851            }
852        }
853    }
854
855    /* cleanup */
856    blockIteratorFree( blockIterator );
857    tr_free( webseeds );
858    tr_free( peers );
859
860    t->refillTimer = NULL;
861    torrentUnlock( t );
862    return FALSE;
863}
864
865static void
866broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
867{
868    size_t i;
869    size_t peerCount;
870    tr_peer ** peers;
871
872    assert( torrentIsLocked( t ) );
873
874    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
875
876    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
877    peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
878    for( i=0; i<peerCount; ++i )
879        if( peers[i]->msgs )
880            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
881}
882
883static void
884addStrike( Torrent * t,
885           tr_peer * peer )
886{
887    tordbg( t, "increasing peer %s strike count to %d",
888            tr_peerIoAddrStr( &peer->addr,
889                              peer->port ), peer->strikes + 1 );
890
891    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
892    {
893        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
894        atom->myflags |= MYFLAG_BANNED;
895        peer->doPurge = 1;
896        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
897    }
898}
899
900static void
901gotBadPiece( Torrent *        t,
902             tr_piece_index_t pieceIndex )
903{
904    tr_torrent *   tor = t->tor;
905    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
906
907    tor->corruptCur += byteCount;
908    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
909}
910
911static void
912refillSoon( Torrent * t )
913{
914    if( t->refillTimer == NULL )
915        t->refillTimer = tr_timerNew( t->manager->session,
916                                      refillPulse, t,
917                                      REFILL_PERIOD_MSEC );
918}
919
920static void
921peerSuggestedPiece( Torrent            * t UNUSED,
922                    tr_peer            * peer UNUSED,
923                    tr_piece_index_t     pieceIndex UNUSED,
924                    int                  isFastAllowed UNUSED )
925{
926#if 0
927    assert( t );
928    assert( peer );
929    assert( peer->msgs );
930
931    /* is this a valid piece? */
932    if(  pieceIndex >= t->tor->info.pieceCount )
933        return;
934
935    /* don't ask for it if we've already got it */
936    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
937        return;
938
939    /* don't ask for it if they don't have it */
940    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
941        return;
942
943    /* don't ask for it if we're choked and it's not fast */
944    if( !isFastAllowed && peer->clientIsChoked )
945        return;
946
947    /* request the blocks that we don't have in this piece */
948    {
949        tr_block_index_t block;
950        const tr_torrent * tor = t->tor;
951        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
952        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
953
954        for( block=start; block<end; ++block )
955        {
956            if( !tr_cpBlockIsComplete( tor->completion, block ) )
957            {
958                const uint32_t offset = getBlockOffsetInPiece( tor, block );
959                const uint32_t length = tr_torBlockCountBytes( tor, block );
960                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
961                incrementPieceRequests( t, pieceIndex );
962            }
963        }
964    }
965#endif
966}
967
968static void
969peerCallbackFunc( void * vpeer, void * vevent, void * vt )
970{
971    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
972    Torrent * t = vt;
973    const tr_peer_event * e = vevent;
974
975    torrentLock( t );
976
977    switch( e->eventType )
978    {
979        case TR_PEER_UPLOAD_ONLY:
980            /* update our atom */
981            if( peer ) {
982                struct peer_atom * a = getExistingAtom( t, &peer->addr );
983                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
984            }
985            break;
986
987        case TR_PEER_NEED_REQ:
988            refillSoon( t );
989            break;
990
991        case TR_PEER_CANCEL:
992            decrementPieceRequests( t, e->pieceIndex );
993            break;
994
995        case TR_PEER_PEER_GOT_DATA:
996        {
997            const time_t now = time( NULL );
998            tr_torrent * tor = t->tor;
999
1000            tor->activityDate = now;
1001
1002            if( e->wasPieceData )
1003                tor->uploadedCur += e->length;
1004
1005            /* update the stats */
1006            if( e->wasPieceData )
1007                tr_statsAddUploaded( tor->session, e->length );
1008
1009            /* update our atom */
1010            if( peer ) {
1011                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1012                if( e->wasPieceData )
1013                    a->piece_data_time = now;
1014            }
1015
1016            break;
1017        }
1018
1019        case TR_PEER_CLIENT_GOT_SUGGEST:
1020            if( peer )
1021                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1022            break;
1023
1024        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1025            if( peer )
1026                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1027            break;
1028
1029        case TR_PEER_CLIENT_GOT_DATA:
1030        {
1031            const time_t now = time( NULL );
1032            tr_torrent * tor = t->tor;
1033            tor->activityDate = now;
1034
1035            /* only add this to downloadedCur if we got it from a peer --
1036             * webseeds shouldn't count against our ratio.  As one tracker
1037             * admin put it, "Those pieces are downloaded directly from the
1038             * content distributor, not the peers, it is the tracker's job
1039             * to manage the swarms, not the web server and does not fit
1040             * into the jurisdiction of the tracker." */
1041            if( peer && e->wasPieceData )
1042                tor->downloadedCur += e->length;
1043
1044            /* update the stats */ 
1045            if( e->wasPieceData )
1046                tr_statsAddDownloaded( tor->session, e->length );
1047
1048            /* update our atom */
1049            if( peer ) {
1050                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1051                if( e->wasPieceData )
1052                    a->piece_data_time = now;
1053            }
1054
1055            break;
1056        }
1057
1058        case TR_PEER_PEER_PROGRESS:
1059        {
1060            if( peer )
1061            {
1062                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1063                const int peerIsSeed = e->progress >= 1.0;
1064                if( peerIsSeed ) {
1065                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1066                    atom->flags |= ADDED_F_SEED_FLAG;
1067                } else {
1068                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1069                    atom->flags &= ~ADDED_F_SEED_FLAG;
1070                }
1071            }
1072            break;
1073        }
1074
1075        case TR_PEER_CLIENT_GOT_BLOCK:
1076        {
1077            tr_torrent *     tor = t->tor;
1078
1079            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1080
1081            tr_cpBlockAdd( &tor->completion, block );
1082            decrementPieceRequests( t, e->pieceIndex );
1083
1084            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1085
1086            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1087            {
1088                const tr_piece_index_t p = e->pieceIndex;
1089                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1090
1091                if( !ok )
1092                {
1093                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1094                               (unsigned long)p );
1095                }
1096
1097                tr_torrentSetHasPiece( tor, p, ok );
1098                tr_torrentSetPieceChecked( tor, p, TRUE );
1099                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1100
1101                if( !ok )
1102                    gotBadPiece( t, p );
1103                else
1104                {
1105                    int i;
1106                    int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1107                    tr_peer ** peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1108                    for( i=0; i<peerCount; ++i )
1109                        tr_peerMsgsHave( peers[i]->msgs, p );
1110                }
1111
1112                tr_torrentRecheckCompleteness( tor );
1113            }
1114            break;
1115        }
1116
1117        case TR_PEER_ERROR:
1118            if( e->err == EINVAL )
1119            {
1120                addStrike( t, peer );
1121                peer->doPurge = 1;
1122                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1123            }
1124            else if( ( e->err == ERANGE )
1125                  || ( e->err == EMSGSIZE )
1126                  || ( e->err == ENOTCONN ) )
1127            {
1128                /* some protocol error from the peer */
1129                peer->doPurge = 1;
1130                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1131            }
1132            else /* a local error, such as an IO error */
1133            {
1134                t->tor->error = e->err;
1135                tr_strlcpy( t->tor->errorString,
1136                            tr_strerror( t->tor->error ),
1137                            sizeof( t->tor->errorString ) );
1138                tr_torrentStop( t->tor );
1139            }
1140            break;
1141
1142        default:
1143            assert( 0 );
1144    }
1145
1146    torrentUnlock( t );
1147}
1148
1149static void
1150ensureAtomExists( Torrent          * t,
1151                  const tr_address * addr,
1152                  tr_port            port,
1153                  uint8_t            flags,
1154                  uint8_t            from )
1155{
1156    if( getExistingAtom( t, addr ) == NULL )
1157    {
1158        struct peer_atom * a;
1159        a = tr_new0( struct peer_atom, 1 );
1160        a->addr = *addr;
1161        a->port = port;
1162        a->flags = flags;
1163        a->from = from;
1164        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1165        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1166    }
1167}
1168
1169static int
1170getMaxPeerCount( const tr_torrent * tor )
1171{
1172    return tor->maxConnectedPeers;
1173}
1174
1175static int
1176getPeerCount( const Torrent * t )
1177{
1178    return tr_ptrArraySize( &t->peers ) + tr_ptrArraySize( &t->outgoingHandshakes );
1179}
1180
1181/* FIXME: this is kind of a mess. */
1182static tr_bool
1183myHandshakeDoneCB( tr_handshake  * handshake,
1184                   tr_peerIo     * io,
1185                   int             isConnected,
1186                   const uint8_t * peer_id,
1187                   void          * vmanager )
1188{
1189    tr_bool            ok = isConnected;
1190    tr_bool            success = FALSE;
1191    tr_port            port;
1192    const tr_address * addr;
1193    tr_peerMgr       * manager = vmanager;
1194    Torrent          * t;
1195    tr_handshake     * ours;
1196
1197    assert( io );
1198    assert( isConnected == 0 || isConnected == 1 );
1199
1200    t = tr_peerIoHasTorrentHash( io )
1201        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1202        : NULL;
1203
1204    if( tr_peerIoIsIncoming ( io ) )
1205        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1206                                        handshake, handshakeCompare );
1207    else if( t )
1208        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1209                                        handshake, handshakeCompare );
1210    else
1211        ours = handshake;
1212
1213    assert( ours );
1214    assert( ours == handshake );
1215
1216    if( t )
1217        torrentLock( t );
1218
1219    addr = tr_peerIoGetAddress( io, &port );
1220
1221    if( !ok || !t || !t->isRunning )
1222    {
1223        if( t )
1224        {
1225            struct peer_atom * atom = getExistingAtom( t, addr );
1226            if( atom )
1227                ++atom->numFails;
1228        }
1229    }
1230    else /* looking good */
1231    {
1232        struct peer_atom * atom;
1233        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1234        atom = getExistingAtom( t, addr );
1235        atom->time = time( NULL );
1236        atom->piece_data_time = 0;
1237
1238        if( atom->myflags & MYFLAG_BANNED )
1239        {
1240            tordbg( t, "banned peer %s tried to reconnect",
1241                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1242        }
1243        else if( tr_peerIoIsIncoming( io )
1244               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1245
1246        {
1247        }
1248        else
1249        {
1250            tr_peer * peer = getExistingPeer( t, addr );
1251
1252            if( peer ) /* we already have this peer */
1253            {
1254            }
1255            else
1256            {
1257                peer = getPeer( t, addr );
1258                tr_free( peer->client );
1259
1260                if( !peer_id )
1261                    peer->client = NULL;
1262                else {
1263                    char client[128];
1264                    tr_clientForId( client, sizeof( client ), peer_id );
1265                    peer->client = tr_strdup( client );
1266                }
1267
1268                peer->port = port;
1269                peer->io = tr_handshakeStealIO( handshake );
1270                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1271                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1272
1273                success = TRUE;
1274            }
1275        }
1276    }
1277
1278    if( !success )
1279        tr_ptrArrayAppend( &manager->finishedHandshakes, handshake );
1280
1281    if( t )
1282        torrentUnlock( t );
1283
1284    return success;
1285}
1286
1287void
1288tr_peerMgrAddIncoming( tr_peerMgr * manager,
1289                       tr_address * addr,
1290                       tr_port      port,
1291                       int          socket )
1292{
1293    managerLock( manager );
1294
1295    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1296    {
1297        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1298        tr_netClose( socket );
1299    }
1300    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1301    {
1302        tr_netClose( socket );
1303    }
1304    else /* we don't have a connetion to them yet... */
1305    {
1306        tr_peerIo *    io;
1307        tr_handshake * handshake;
1308
1309        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1310
1311        handshake = tr_handshakeNew( io,
1312                                     manager->session->encryptionMode,
1313                                     myHandshakeDoneCB,
1314                                     manager );
1315
1316        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1317                                 handshakeCompare );
1318    }
1319
1320    managerUnlock( manager );
1321}
1322
1323static tr_bool
1324tr_isPex( const tr_pex * pex )
1325{
1326    return pex && tr_isAddress( &pex->addr );
1327}
1328
1329void
1330tr_peerMgrAddPex( tr_peerMgr *    manager,
1331                  const uint8_t * torrentHash,
1332                  uint8_t         from,
1333                  const tr_pex *  pex )
1334{
1335    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1336    {
1337        Torrent * t;
1338        managerLock( manager );
1339
1340        t = getExistingTorrent( manager, torrentHash );
1341        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1342            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1343
1344        managerUnlock( manager );
1345    }
1346}
1347
1348tr_pex *
1349tr_peerMgrCompactToPex( const void *    compact,
1350                        size_t          compactLen,
1351                        const uint8_t * added_f,
1352                        size_t          added_f_len,
1353                        size_t *        pexCount )
1354{
1355    size_t          i;
1356    size_t          n = compactLen / 6;
1357    const uint8_t * walk = compact;
1358    tr_pex *        pex = tr_new0( tr_pex, n );
1359
1360    for( i = 0; i < n; ++i )
1361    {
1362        pex[i].addr.type = TR_AF_INET;
1363        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1364        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1365        if( added_f && ( n == added_f_len ) )
1366            pex[i].flags = added_f[i];
1367    }
1368
1369    *pexCount = n;
1370    return pex;
1371}
1372
1373tr_pex *
1374tr_peerMgrCompact6ToPex( const void    * compact,
1375                         size_t          compactLen,
1376                         const uint8_t * added_f,
1377                         size_t          added_f_len,
1378                         size_t        * pexCount )
1379{
1380    size_t          i;
1381    size_t          n = compactLen / 18;
1382    const uint8_t * walk = compact;
1383    tr_pex *        pex = tr_new0( tr_pex, n );
1384   
1385    for( i = 0; i < n; ++i )
1386    {
1387        pex[i].addr.type = TR_AF_INET6;
1388        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1389        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1390        if( added_f && ( n == added_f_len ) )
1391            pex[i].flags = added_f[i];
1392    }
1393   
1394    *pexCount = n;
1395    return pex;
1396}
1397
1398tr_pex *
1399tr_peerMgrArrayToPex( const void * array,
1400                      size_t       arrayLen,
1401                      size_t      * pexCount )
1402{
1403    size_t          i;
1404    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1405    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1406    const uint8_t * walk = array;
1407    tr_pex        * pex = tr_new0( tr_pex, n );
1408   
1409    for( i = 0 ; i < n ; i++ ) {
1410        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1411        tr_suspectAddress( &pex[i].addr, "tracker"  );
1412        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1413        pex[i].flags = 0x00;
1414        walk += sizeof( tr_address ) + 2;
1415    }
1416   
1417    *pexCount = n;
1418    return pex;
1419}
1420
1421/**
1422***
1423**/
1424
1425void
1426tr_peerMgrSetBlame( tr_peerMgr *     manager,
1427                    const uint8_t *  torrentHash,
1428                    tr_piece_index_t pieceIndex,
1429                    int              success )
1430{
1431    if( !success )
1432    {
1433        int        peerCount, i;
1434        Torrent *  t = getExistingTorrent( manager, torrentHash );
1435        tr_peer ** peers;
1436
1437        assert( torrentIsLocked( t ) );
1438
1439        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1440        for( i = 0; i < peerCount; ++i )
1441        {
1442            tr_peer * peer = peers[i];
1443            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1444            {
1445                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1446                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1447                        pieceIndex, (int)peer->strikes + 1 );
1448                addStrike( t, peer );
1449            }
1450        }
1451    }
1452}
1453
1454int
1455tr_pexCompare( const void * va, const void * vb )
1456{
1457    const tr_pex * a = va;
1458    const tr_pex * b = vb;
1459    int i;
1460
1461    assert( tr_isPex( a ) );
1462    assert( tr_isPex( b ) );
1463
1464    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1465        return i;
1466
1467    if( a->port != b->port )
1468        return a->port < b->port ? -1 : 1;
1469
1470    return 0;
1471}
1472
1473static int
1474peerPrefersCrypto( const tr_peer * peer )
1475{
1476    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1477        return TRUE;
1478
1479    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1480        return FALSE;
1481
1482    return tr_peerIoIsEncrypted( peer->io );
1483}
1484
1485int
1486tr_peerMgrGetPeers( tr_peerMgr      * manager,
1487                    const uint8_t   * torrentHash,
1488                    tr_pex         ** setme_pex,
1489                    uint8_t           af)
1490{
1491    int peersReturning = 0;
1492    const Torrent *  t;
1493
1494    managerLock( manager );
1495
1496    t = getExistingTorrent( manager, torrentHash );
1497    if( t == NULL )
1498    {
1499        *setme_pex = NULL;
1500    }
1501    else
1502    {
1503        int i;
1504        const tr_peer ** peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1505        const int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1506        /* for now, this will waste memory on torrents that have both
1507         * ipv6 and ipv4 peers */
1508        tr_pex * pex = tr_new( tr_pex, peerCount );
1509        tr_pex * walk = pex;
1510
1511        for( i=0; i<peerCount; ++i )
1512        {
1513            const tr_peer * peer = peers[i];
1514            if( peer->addr.type == af )
1515            {
1516                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1517
1518                assert( tr_isAddress( &peer->addr ) );
1519                walk->addr = peer->addr;
1520                walk->port = peer->port;
1521                walk->flags = 0;
1522                if( peerPrefersCrypto( peer ) )
1523                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1524                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1525                    walk->flags |= ADDED_F_SEED_FLAG;
1526                ++peersReturning;
1527                ++walk;
1528            }
1529        }
1530
1531        assert( ( walk - pex ) == peersReturning );
1532        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1533        *setme_pex = pex;
1534    }
1535
1536    managerUnlock( manager );
1537    return peersReturning;
1538}
1539
1540static int reconnectPulse( void * vtorrent );
1541
1542static int rechokePulse( void * vtorrent );
1543
1544void
1545tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1546                        const uint8_t * torrentHash )
1547{
1548    Torrent * t;
1549
1550    managerLock( manager );
1551
1552    t = getExistingTorrent( manager, torrentHash );
1553
1554    assert( t );
1555    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1556    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1557
1558    if( !t->isRunning )
1559    {
1560        t->isRunning = 1;
1561
1562        t->reconnectTimer = tr_timerNew( t->manager->session,
1563                                         reconnectPulse, t,
1564                                         RECONNECT_PERIOD_MSEC );
1565
1566        t->rechokeTimer = tr_timerNew( t->manager->session,
1567                                       rechokePulse, t,
1568                                       RECHOKE_PERIOD_MSEC );
1569
1570        reconnectPulse( t );
1571
1572        rechokePulse( t );
1573
1574        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1575            refillSoon( t );
1576    }
1577
1578    managerUnlock( manager );
1579}
1580
1581static void
1582stopTorrent( Torrent * t )
1583{
1584    assert( torrentIsLocked( t ) );
1585
1586    t->isRunning = 0;
1587    tr_timerFree( &t->rechokeTimer );
1588    tr_timerFree( &t->reconnectTimer );
1589
1590    /* disconnect the peers. */
1591    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1592    tr_ptrArrayClear( &t->peers );
1593
1594    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1595     * which removes the handshake from t->outgoingHandshakes... */
1596    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1597        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1598}
1599
1600void
1601tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1602                       const uint8_t * torrentHash )
1603{
1604    managerLock( manager );
1605
1606    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1607
1608    managerUnlock( manager );
1609}
1610
1611void
1612tr_peerMgrAddTorrent( tr_peerMgr * manager,
1613                      tr_torrent * tor )
1614{
1615    Torrent * t;
1616
1617    managerLock( manager );
1618
1619    assert( tor );
1620    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1621
1622    t = torrentConstructor( manager, tor );
1623    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1624
1625    managerUnlock( manager );
1626}
1627
1628void
1629tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1630                         const uint8_t * torrentHash )
1631{
1632    Torrent * t;
1633
1634    managerLock( manager );
1635
1636    t = getExistingTorrent( manager, torrentHash );
1637    assert( t );
1638    stopTorrent( t );
1639    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1640    torrentDestructor( t );
1641
1642    managerUnlock( manager );
1643}
1644
1645void
1646tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1647                               const uint8_t *    torrentHash,
1648                               int8_t *           tab,
1649                               unsigned int       tabCount )
1650{
1651    tr_piece_index_t   i;
1652    const Torrent *    t;
1653    const tr_torrent * tor;
1654    float              interval;
1655    tr_bool            isSeed;
1656    int                peerCount;
1657    const tr_peer **   peers;
1658    managerLock( manager );
1659
1660    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1661    tor = t->tor;
1662    interval = tor->info.pieceCount / (float)tabCount;
1663    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1664    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1665    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1666
1667    memset( tab, 0, tabCount );
1668
1669    for( i = 0; tor && i < tabCount; ++i )
1670    {
1671        const int piece = i * interval;
1672
1673        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1674            tab[i] = -1;
1675        else if( peerCount ) {
1676            int j;
1677            for( j = 0; j < peerCount; ++j )
1678                if( tr_bitfieldHas( peers[j]->have, i ) )
1679                    ++tab[i];
1680        }
1681    }
1682
1683    managerUnlock( manager );
1684}
1685
1686/* Returns the pieces that are available from peers */
1687tr_bitfield*
1688tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1689                        const uint8_t *    torrentHash )
1690{
1691    int i;
1692    int peerCount;
1693    Torrent * t;
1694    const tr_peer ** peers;
1695    tr_bitfield * pieces;
1696    managerLock( manager );
1697
1698    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1699    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1700    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1701    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1702    for( i=0; i<peerCount; ++i )
1703        tr_bitfieldOr( pieces, peers[i]->have );
1704
1705    managerUnlock( manager );
1706    return pieces;
1707}
1708
1709int
1710tr_peerMgrHasConnections( const tr_peerMgr * manager,
1711                          const uint8_t *    torrentHash )
1712{
1713    int ret;
1714    const Torrent * t;
1715    managerLock( manager );
1716
1717    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1718    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1719
1720    managerUnlock( manager );
1721    return ret;
1722}
1723
1724void
1725tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1726                        const uint8_t    * torrentHash,
1727                        int              * setmePeersKnown,
1728                        int              * setmePeersConnected,
1729                        int              * setmeSeedsConnected,
1730                        int              * setmeWebseedsSendingToUs,
1731                        int              * setmePeersSendingToUs,
1732                        int              * setmePeersGettingFromUs,
1733                        int              * setmePeersFrom )
1734{
1735    int i, size;
1736    const Torrent * t;
1737    const tr_peer ** peers;
1738    const tr_webseed ** webseeds;
1739
1740    managerLock( manager );
1741
1742    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1743    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1744    size = TR_PTR_ARRAY_LENGTH( &t->peers );
1745
1746    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1747    *setmePeersConnected       = 0;
1748    *setmeSeedsConnected       = 0;
1749    *setmePeersGettingFromUs   = 0;
1750    *setmePeersSendingToUs     = 0;
1751    *setmeWebseedsSendingToUs  = 0;
1752
1753    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1754        setmePeersFrom[i] = 0;
1755
1756    for( i=0; i<size; ++i )
1757    {
1758        const tr_peer * peer = peers[i];
1759        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1760
1761        if( peer->io == NULL ) /* not connected */
1762            continue;
1763
1764        ++*setmePeersConnected;
1765
1766        ++setmePeersFrom[atom->from];
1767
1768        if( clientIsDownloadingFrom( peer ) )
1769            ++*setmePeersSendingToUs;
1770
1771        if( clientIsUploadingTo( peer ) )
1772            ++*setmePeersGettingFromUs;
1773
1774        if( atom->flags & ADDED_F_SEED_FLAG )
1775            ++*setmeSeedsConnected;
1776    }
1777
1778    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1779    size = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1780    for( i=0; i<size; ++i )
1781        if( tr_webseedIsActive( webseeds[i] ) )
1782            ++*setmeWebseedsSendingToUs;
1783
1784    managerUnlock( manager );
1785}
1786
1787float*
1788tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1789                     const uint8_t *    torrentHash )
1790{
1791    const Torrent * t;
1792    const tr_webseed ** webseeds;
1793    int i;
1794    int webseedCount;
1795    float * ret;
1796
1797    assert( manager );
1798    managerLock( manager );
1799
1800    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1801    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1802    webseedCount = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1803    assert( webseedCount == t->tor->info.webseedCount );
1804    ret = tr_new0( float, webseedCount );
1805
1806    for( i=0; i<webseedCount; ++i )
1807        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1808            ret[i] = -1.0;
1809
1810    managerUnlock( manager );
1811    return ret;
1812}
1813
1814double
1815tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1816{
1817    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, direction ) : 0.0;
1818}
1819
1820
1821struct tr_peer_stat *
1822tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1823                     const   uint8_t     * torrentHash,
1824                     int                 * setmeCount )
1825{
1826    int i, size;
1827    const Torrent * t;
1828    const tr_peer ** peers;
1829    tr_peer_stat * ret;
1830
1831    assert( manager );
1832    managerLock( manager );
1833
1834    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1835    size = TR_PTR_ARRAY_LENGTH( &t->peers );
1836    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1837    ret = tr_new0( tr_peer_stat, size );
1838
1839    for( i = 0; i < size; ++i )
1840    {
1841        char *                   pch;
1842        const tr_peer *          peer = peers[i];
1843        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1844        tr_peer_stat *           stat = ret + i;
1845        tr_address               norm_addr;
1846
1847        norm_addr = peer->addr;
1848        tr_normalizeV4Mapped( &norm_addr );
1849        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1850        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1851                   sizeof( stat->client ) );
1852        stat->port               = ntohs( peer->port );
1853        stat->from               = atom->from;
1854        stat->progress           = peer->progress;
1855        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1856        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1857        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1858        stat->peerIsChoked       = peer->peerIsChoked;
1859        stat->peerIsInterested   = peer->peerIsInterested;
1860        stat->clientIsChoked     = peer->clientIsChoked;
1861        stat->clientIsInterested = peer->clientIsInterested;
1862        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1863        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1864        stat->isUploadingTo      = clientIsUploadingTo( peer );
1865        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1866
1867        pch = stat->flagStr;
1868        if( t->optimistic == peer ) *pch++ = 'O';
1869        if( stat->isDownloadingFrom ) *pch++ = 'D';
1870        else if( stat->clientIsInterested ) *pch++ = 'd';
1871        if( stat->isUploadingTo ) *pch++ = 'U';
1872        else if( stat->peerIsInterested ) *pch++ = 'u';
1873        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1874        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1875        if( stat->isEncrypted ) *pch++ = 'E';
1876        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1877        if( stat->isIncoming ) *pch++ = 'I';
1878        *pch = '\0';
1879    }
1880
1881    *setmeCount = size;
1882
1883    managerUnlock( manager );
1884    return ret;
1885}
1886
1887/**
1888***
1889**/
1890
1891struct ChokeData
1892{
1893    tr_bool         doUnchoke;
1894    tr_bool         isInterested;
1895    tr_bool         isChoked;
1896    int             rate;
1897    tr_peer *       peer;
1898};
1899
1900static int
1901compareChoke( const void * va,
1902              const void * vb )
1903{
1904    const struct ChokeData * a = va;
1905    const struct ChokeData * b = vb;
1906
1907    if( a->rate != b->rate ) /* prefer higher overall speeds */
1908        return a->rate > b->rate ? -1 : 1;
1909
1910    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1911        return a->isChoked ? 1 : -1;
1912
1913    return 0;
1914}
1915
1916static int
1917isNew( const tr_peer * peer )
1918{
1919    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1920}
1921
1922static int
1923isSame( const tr_peer * peer )
1924{
1925    return peer && peer->client && strstr( peer->client, "Transmission" );
1926}
1927
1928/**
1929***
1930**/
1931
1932static void
1933rechoke( Torrent * t )
1934{
1935    int i, size, unchokedInterested;
1936    const int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1937    tr_peer ** peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1938    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1939    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1940
1941    assert( torrentIsLocked( t ) );
1942
1943    /* sort the peers by preference and rate */
1944    for( i = 0, size = 0; i < peerCount; ++i )
1945    {
1946        tr_peer * peer = peers[i];
1947        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1948
1949        if( peer->progress >= 1.0 ) /* choke all seeds */
1950        {
1951            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1952        }
1953        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1954        {
1955            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1956        }
1957        else if( chokeAll ) /* choke everyone if we're not uploading */
1958        {
1959            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1960        }
1961        else
1962        {
1963            struct ChokeData * n = &choke[size++];
1964            n->peer         = peer;
1965            n->isInterested = peer->peerIsInterested;
1966            n->isChoked     = peer->peerIsChoked;
1967            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1968        }
1969    }
1970
1971    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1972
1973    /**
1974     * Reciprocation and number of uploads capping is managed by unchoking
1975     * the N peers which have the best upload rate and are interested.
1976     * This maximizes the client's download rate. These N peers are
1977     * referred to as downloaders, because they are interested in downloading
1978     * from the client.
1979     *
1980     * Peers which have a better upload rate (as compared to the downloaders)
1981     * but aren't interested get unchoked. If they become interested, the
1982     * downloader with the worst upload rate gets choked. If a client has
1983     * a complete file, it uses its upload rate rather than its download
1984     * rate to decide which peers to unchoke.
1985     */
1986    unchokedInterested = 0;
1987    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1988        choke[i].doUnchoke = 1;
1989        if( choke[i].isInterested )
1990            ++unchokedInterested;
1991    }
1992
1993    /* optimistic unchoke */
1994    if( i < size )
1995    {
1996        int n;
1997        struct ChokeData * c;
1998        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1999
2000        for( ; i<size; ++i )
2001        {
2002            if( choke[i].isInterested )
2003            {
2004                const tr_peer * peer = choke[i].peer;
2005                int x = 1, y;
2006                if( isNew( peer ) ) x *= 3;
2007                if( isSame( peer ) ) x *= 3;
2008                for( y=0; y<x; ++y )
2009                    tr_ptrArrayAppend( &randPool, &choke[i] );
2010            }
2011        }
2012
2013        if(( n = tr_ptrArraySize( &randPool )))
2014        {
2015            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2016            c->doUnchoke = 1;
2017            t->optimistic = c->peer;
2018        }
2019
2020        tr_ptrArrayDestruct( &randPool, NULL );
2021    }
2022
2023    for( i=0; i<size; ++i )
2024        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2025
2026    /* cleanup */
2027    tr_free( choke );
2028}
2029
2030static int
2031rechokePulse( void * vtorrent )
2032{
2033    Torrent * t = vtorrent;
2034
2035    torrentLock( t );
2036    rechoke( t );
2037    torrentUnlock( t );
2038    return TRUE;
2039}
2040
2041/***
2042****
2043****  Life and Death
2044****
2045***/
2046
2047static int
2048shouldPeerBeClosed( const Torrent * t,
2049                    const tr_peer * peer,
2050                    int             peerCount )
2051{
2052    const tr_torrent *       tor = t->tor;
2053    const time_t             now = time( NULL );
2054    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2055
2056    /* if it's marked for purging, close it */
2057    if( peer->doPurge )
2058    {
2059        tordbg( t, "purging peer %s because its doPurge flag is set",
2060                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2061        return TRUE;
2062    }
2063
2064    /* if we're seeding and the peer has everything we have,
2065     * and enough time has passed for a pex exchange, then disconnect */
2066    if( tr_torrentIsSeed( tor ) )
2067    {
2068        int peerHasEverything;
2069        if( atom->flags & ADDED_F_SEED_FLAG )
2070            peerHasEverything = TRUE;
2071        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2072            peerHasEverything = FALSE;
2073        else {
2074            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2075            tr_bitfieldDifference( tmp, peer->have );
2076            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2077            tr_bitfieldFree( tmp );
2078        }
2079
2080        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2081        {
2082            tordbg( t, "purging peer %s because we're both seeds",
2083                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2084            return TRUE;
2085        }
2086    }
2087
2088    /* disconnect if it's been too long since piece data has been transferred.
2089     * this is on a sliding scale based on number of available peers... */
2090    {
2091        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2092        /* if we have >= relaxIfFewerThan, strictness is 100%.
2093         * if we have zero connections, strictness is 0% */
2094        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2095            ? 1.0
2096            : peerCount / (float)relaxStrictnessIfFewerThanN;
2097        const int lo = MIN_UPLOAD_IDLE_SECS;
2098        const int hi = MAX_UPLOAD_IDLE_SECS;
2099        const int limit = hi - ( ( hi - lo ) * strictness );
2100        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2101/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2102        if( idleTime > limit ) {
2103            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2104                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2105            return TRUE;
2106        }
2107    }
2108
2109    return FALSE;
2110}
2111
2112static tr_peer **
2113getPeersToClose( Torrent * t, int * setmeSize )
2114{
2115    int i, peerCount, outsize;
2116    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2117    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2118
2119    assert( torrentIsLocked( t ) );
2120
2121    for( i = outsize = 0; i < peerCount; ++i )
2122        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2123            ret[outsize++] = peers[i];
2124
2125    *setmeSize = outsize;
2126    return ret;
2127}
2128
2129static int
2130compareCandidates( const void * va,
2131                   const void * vb )
2132{
2133    const struct peer_atom * a = *(const struct peer_atom**) va;
2134    const struct peer_atom * b = *(const struct peer_atom**) vb;
2135
2136    /* <Charles> Here we would probably want to try reconnecting to
2137     * peers that had most recently given us data. Lots of users have
2138     * trouble with resets due to their routers and/or ISPs. This way we
2139     * can quickly recover from an unwanted reset. So we sort
2140     * piece_data_time in descending order.
2141     */
2142
2143    if( a->piece_data_time != b->piece_data_time )
2144        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2145
2146    if( a->numFails != b->numFails )
2147        return a->numFails < b->numFails ? -1 : 1;
2148
2149    if( a->time != b->time )
2150        return a->time < b->time ? -1 : 1;
2151
2152    /* all other things being equal, prefer peers whose
2153     * information comes from a more reliable source */
2154    if( a->from != b->from )
2155        return a->from < b->from ? -1 : 1;
2156
2157    return 0;
2158}
2159
2160static int
2161getReconnectIntervalSecs( const struct peer_atom * atom )
2162{
2163    int          sec;
2164    const time_t now = time( NULL );
2165
2166    /* if we were recently connected to this peer and transferring piece
2167     * data, try to reconnect to them sooner rather that later -- we don't
2168     * want network troubles to get in the way of a good peer. */
2169    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2170        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2171
2172    /* don't allow reconnects more often than our minimum */
2173    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2174        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2175
2176    /* otherwise, the interval depends on how many times we've tried
2177     * and failed to connect to the peer */
2178    else switch( atom->numFails ) {
2179        case 0: sec = 0; break;
2180        case 1: sec = 5; break;
2181        case 2: sec = 2 * 60; break;
2182        case 3: sec = 15 * 60; break;
2183        case 4: sec = 30 * 60; break;
2184        case 5: sec = 60 * 60; break;
2185        default: sec = 120 * 60; break;
2186    }
2187
2188    return sec;
2189}
2190
2191static struct peer_atom **
2192getPeerCandidates( Torrent * t, int * setmeSize )
2193{
2194    int                 i, atomCount, retCount;
2195    struct peer_atom ** atoms;
2196    struct peer_atom ** ret;
2197    const time_t        now = time( NULL );
2198    const int           seed = tr_torrentIsSeed( t->tor );
2199
2200    assert( torrentIsLocked( t ) );
2201
2202    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2203    ret = tr_new( struct peer_atom*, atomCount );
2204    for( i = retCount = 0; i < atomCount; ++i )
2205    {
2206        int                interval;
2207        struct peer_atom * atom = atoms[i];
2208
2209        /* peer fed us too much bad data ... we only keep it around
2210         * now to weed it out in case someone sends it to us via pex */
2211        if( atom->myflags & MYFLAG_BANNED )
2212            continue;
2213
2214        /* peer was unconnectable before, so we're not going to keep trying.
2215         * this is needs a separate flag from `banned', since if they try
2216         * to connect to us later, we'll let them in */
2217        if( atom->myflags & MYFLAG_UNREACHABLE )
2218            continue;
2219
2220        /* we don't need two connections to the same peer... */
2221        if( peerIsInUse( t, &atom->addr ) )
2222            continue;
2223
2224        /* no need to connect if we're both seeds... */
2225        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2226                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2227            continue;
2228
2229        /* don't reconnect too often */
2230        interval = getReconnectIntervalSecs( atom );
2231        if( ( now - atom->time ) < interval )
2232        {
2233            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2234                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2235            continue;
2236        }
2237
2238        /* Don't connect to peers in our blocklist */
2239        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2240            continue;
2241
2242        ret[retCount++] = atom;
2243    }
2244
2245    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2246    *setmeSize = retCount;
2247    return ret;
2248}
2249
2250static int
2251reconnectPulse( void * vtorrent )
2252{
2253    Torrent *     t = vtorrent;
2254    static time_t prevTime = 0;
2255    static int    newConnectionsThisSecond = 0;
2256    time_t        now;
2257
2258    torrentLock( t );
2259
2260    now = time( NULL );
2261    if( prevTime != now )
2262    {
2263        prevTime = now;
2264        newConnectionsThisSecond = 0;
2265    }
2266
2267    if( !t->isRunning )
2268    {
2269        removeAllPeers( t );
2270    }
2271    else
2272    {
2273        int i, nCandidates, nBad;
2274        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2275        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2276
2277        if( nBad || nCandidates )
2278            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2279                    "%d connection candidates, %d atoms, max per pulse is %d",
2280                    t->tor->info.name, nBad, nCandidates,
2281                    tr_ptrArraySize( &t->pool ),
2282                    (int)MAX_RECONNECTIONS_PER_PULSE );
2283
2284        /* disconnect some peers.
2285           if we transferred piece data, then they might be good peers,
2286           so reset their `numFails' weight to zero.  otherwise we connected
2287           to them fruitlessly, so mark it as another fail */
2288        for( i = 0; i < nBad; ++i ) {
2289            tr_peer * peer = connections[i];
2290            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2291            if( atom->piece_data_time )
2292                atom->numFails = 0;
2293            else
2294                ++atom->numFails;
2295            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2296            removePeer( t, peer );
2297        }
2298
2299        /* add some new ones */
2300        for( i = 0;    ( i < nCandidates )
2301           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2302           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2303           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2304        {
2305            tr_peerMgr *       mgr = t->manager;
2306            struct peer_atom * atom = candidates[i];
2307            tr_peerIo *        io;
2308
2309            tordbg( t, "Starting an OUTGOING connection with %s",
2310                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2311
2312            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2313
2314            if( io == NULL )
2315            {
2316                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2317                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2318                atom->myflags |= MYFLAG_UNREACHABLE;
2319            }
2320            else
2321            {
2322                tr_handshake * handshake = tr_handshakeNew( io,
2323                                                            mgr->session->encryptionMode,
2324                                                            myHandshakeDoneCB,
2325                                                            mgr );
2326
2327                assert( tr_peerIoGetTorrentHash( io ) );
2328
2329                ++newConnectionsThisSecond;
2330
2331                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2332                                         handshakeCompare );
2333            }
2334
2335            atom->time = time( NULL );
2336        }
2337
2338        /* cleanup */
2339        tr_free( connections );
2340        tr_free( candidates );
2341    }
2342
2343    torrentUnlock( t );
2344    return TRUE;
2345}
2346
2347/****
2348*****
2349*****  BANDWIDTH ALLOCATION
2350*****
2351****/
2352
2353static void
2354pumpAllPeers( tr_peerMgr * mgr )
2355{
2356    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2357    int       i, j;
2358
2359    for( i=0; i<torrentCount; ++i )
2360    {
2361        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2362        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2363        {
2364            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2365            tr_peerMsgsPulse( peer->msgs );
2366        }
2367    }
2368}
2369
2370static int
2371bandwidthPulse( void * vmgr )
2372{
2373    tr_handshake * handshake;
2374    tr_peerMgr * mgr = vmgr;
2375    managerLock( mgr );
2376
2377    /* FIXME: this next line probably isn't necessary... */
2378    pumpAllPeers( mgr );
2379
2380    /* allocate bandwidth to the peers */
2381    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2382    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2383
2384    /* free all the finished handshakes */
2385    while(( handshake = tr_ptrArrayPop( &mgr->finishedHandshakes )))
2386        tr_handshakeFree( handshake );
2387
2388    managerUnlock( mgr );
2389    return TRUE;
2390}
Note: See TracBrowser for help on using the repository browser.