source: trunk/libtransmission/peer-mgr.c @ 7578

Last change on this file since 7578 was 7578, checked in by charles, 12 years ago

(trunk libT) avoid some unnecessary memory fragmentation... for composited objects that have a tr_completion, contain the it directly rather than a pointer to one allocated elsewhere on the heap.

  • Property svn:keywords set to Date Rev Author Id
File size: 67.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7578 2009-01-02 17:01:55Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 4,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 8,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_ptrArray       finishedHandshakes; /* tr_handshake */
148    tr_timer        * bandwidthTimer;
149};
150
151#define tordbg( t, ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
155    } while( 0 )
156
157#define dbgmsg( ... ) \
158    do { \
159        if( tr_deepLoggingIsActive( ) ) \
160            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
161    } while( 0 )
162
163/**
164***
165**/
166
167static void
168managerLock( const struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->session );
171}
172
173static void
174managerUnlock( const struct tr_peerMgr * manager )
175{
176    tr_globalUnlock( manager->session );
177}
178
179static void
180torrentLock( Torrent * torrent )
181{
182    managerLock( torrent->manager );
183}
184
185static void
186torrentUnlock( Torrent * torrent )
187{
188    managerUnlock( torrent->manager );
189}
190
191static int
192torrentIsLocked( const Torrent * t )
193{
194    return tr_globalIsLocked( t->manager->session );
195}
196
197/**
198***
199**/
200
201static int
202handshakeCompareToAddr( const void * va, const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a, const void * b )
211{
212    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
213}
214
215static tr_handshake*
216getExistingHandshake( tr_ptrArray      * handshakes,
217                      const tr_address * addr )
218{
219    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va, const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return tr_compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va, const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va, const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return tr_compareAddresses( &a->addr, &b->addr );
278}
279
280static int
281peerCompareToAddr( const void * va, const void * vb )
282{
283    const tr_peer * a = va;
284
285    return tr_compareAddresses( &a->addr, vb );
286}
287
288static tr_peer*
289getExistingPeer( Torrent          * torrent,
290                 const tr_address * addr )
291{
292    assert( torrentIsLocked( torrent ) );
293    assert( addr );
294
295    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
296}
297
298static struct peer_atom*
299getExistingAtom( const Torrent    * t,
300                 const tr_address * addr )
301{
302    Torrent * tt = (Torrent*)t;
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( &t->outgoingHandshakes, addr )
317        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( tr_torrent * tor, const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
327    return p;
328}
329
330static tr_peer*
331getPeer( Torrent          * torrent,
332         const tr_address * addr )
333{
334    tr_peer * peer;
335
336    assert( torrentIsLocked( torrent ) );
337
338    peer = getExistingPeer( torrent, addr );
339
340    if( peer == NULL )
341    {
342        peer = peerConstructor( torrent->tor, addr );
343        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
344    }
345
346    return peer;
347}
348
349static void
350peerDestructor( tr_peer * peer )
351{
352    assert( peer );
353
354    if( peer->msgs != NULL )
355    {
356        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
357        tr_peerMsgsFree( peer->msgs );
358    }
359
360    tr_peerIoFree( peer->io );
361
362    tr_bitfieldFree( peer->have );
363    tr_bitfieldFree( peer->blame );
364    tr_free( peer->client );
365
366    tr_bandwidthFree( peer->bandwidth );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t,
373            tr_peer * peer )
374{
375    tr_peer *          removed;
376    struct peer_atom * atom;
377
378    assert( torrentIsLocked( t ) );
379
380    atom = getExistingAtom( t, &peer->addr );
381    assert( atom );
382    atom->time = time( NULL );
383
384    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
385    assert( removed == peer );
386    peerDestructor( removed );
387}
388
389static void
390removeAllPeers( Torrent * t )
391{
392    while( !tr_ptrArrayEmpty( &t->peers ) )
393        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
394}
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400    uint8_t   hash[SHA_DIGEST_LENGTH];
401
402    assert( t );
403    assert( !t->isRunning );
404    assert( torrentIsLocked( t ) );
405    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
406    assert( tr_ptrArrayEmpty( &t->peers ) );
407
408    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
409
410    tr_timerFree( &t->reconnectTimer );
411    tr_timerFree( &t->rechokeTimer );
412    tr_timerFree( &t->refillTimer );
413
414    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
417    tr_ptrArrayDestruct( &t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423static void peerCallbackFunc( void * vpeer,
424                              void * vevent,
425                              void * vt );
426
427static Torrent*
428torrentConstructor( tr_peerMgr * manager,
429                    tr_torrent * tor )
430{
431    int       i;
432    Torrent * t;
433
434    t = tr_new0( Torrent, 1 );
435    t->manager = manager;
436    t->tor = tor;
437    t->pool = TR_PTR_ARRAY_INIT;
438    t->peers = TR_PTR_ARRAY_INIT;
439    t->webseeds = TR_PTR_ARRAY_INIT;
440    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
441    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
442
443    for( i = 0; i < tor->info.webseedCount; ++i )
444    {
445        tr_webseed * w =
446            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
447        tr_ptrArrayAppend( &t->webseeds, w );
448    }
449
450    return t;
451}
452
453
454static int bandwidthPulse( void * vmgr );
455
456
457tr_peerMgr*
458tr_peerMgrNew( tr_session * session )
459{
460    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
461
462    m->session = session;
463    m->torrents = TR_PTR_ARRAY_INIT;
464    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
465    m->finishedHandshakes = TR_PTR_ARRAY_INIT;
466    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
467    return m;
468}
469
470void
471tr_peerMgrFree( tr_peerMgr * manager )
472{
473    tr_handshake * handshake;
474
475    managerLock( manager );
476
477    tr_timerFree( &manager->bandwidthTimer );
478
479    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
480     * the item from manager->handshakes, so this is a little roundabout... */
481    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
482        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
483
484    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
485
486    while(( handshake = tr_ptrArrayPop( &manager->finishedHandshakes )))
487        tr_handshakeFree( handshake );
488
489    tr_ptrArrayDestruct( &manager->finishedHandshakes, NULL );
490
491    /* free the torrents. */
492    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
493
494    managerUnlock( manager );
495    tr_free( manager );
496}
497
498static int
499clientIsDownloadingFrom( const tr_peer * peer )
500{
501    return peer->clientIsInterested && !peer->clientIsChoked;
502}
503
504static int
505clientIsUploadingTo( const tr_peer * peer )
506{
507    return peer->peerIsInterested && !peer->peerIsChoked;
508}
509
510/***
511****
512***/
513
514tr_bool
515tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
516                      const uint8_t     * torrentHash,
517                      const tr_address  * addr )
518{
519    tr_bool isSeed = FALSE;
520    const Torrent * t = NULL;
521    const struct peer_atom * atom = NULL;
522
523    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
524    if( t )
525        atom = getExistingAtom( t, addr );
526    if( atom )
527        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
528
529    return isSeed;
530}
531
532/****
533*****
534*****  REFILL
535*****
536****/
537
538static void
539assertValidPiece( Torrent * t, tr_piece_index_t piece )
540{
541    assert( t );
542    assert( t->tor );
543    assert( piece < t->tor->info.pieceCount );
544}
545
546static int
547getPieceRequests( Torrent * t, tr_piece_index_t piece )
548{
549    assertValidPiece( t, piece );
550
551    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
552}
553
554static void
555incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
556{
557    assertValidPiece( t, piece );
558
559    if( t->pendingRequestCount == NULL )
560        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
561    t->pendingRequestCount[piece]++;
562}
563
564static void
565decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
566{
567    assertValidPiece( t, piece );
568
569    if( t->pendingRequestCount )
570        t->pendingRequestCount[piece]--;
571}
572
573struct tr_refill_piece
574{
575    tr_priority_t    priority;
576    uint32_t         piece;
577    uint32_t         peerCount;
578    int              random;
579    int              pendingRequestCount;
580    int              missingBlockCount;
581};
582
583static int
584compareRefillPiece( const void * aIn, const void * bIn )
585{
586    const struct tr_refill_piece * a = aIn;
587    const struct tr_refill_piece * b = bIn;
588
589    /* if one piece has a higher priority, it goes first */
590    if( a->priority != b->priority )
591        return a->priority > b->priority ? -1 : 1;
592
593    /* have a per-priority endgame */
594    if( a->pendingRequestCount != b->pendingRequestCount )
595        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
596
597    /* fewer missing pieces goes first */
598    if( a->missingBlockCount != b->missingBlockCount )
599        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
600
601    /* otherwise if one has fewer peers, it goes first */
602    if( a->peerCount != b->peerCount )
603        return a->peerCount < b->peerCount ? -1 : 1;
604
605    /* otherwise go with our random seed */
606    if( a->random != b->random )
607        return a->random < b->random ? -1 : 1;
608
609    return 0;
610}
611
612static tr_piece_index_t *
613getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
614{
615    const tr_torrent  * tor = t->tor;
616    const tr_info     * inf = &tor->info;
617    tr_piece_index_t    i;
618    tr_piece_index_t    poolSize = 0;
619    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
620    int                 peerCount;
621    const tr_peer    ** peers;
622
623    assert( torrentIsLocked( t ) );
624
625    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
626    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
627
628    /* make a list of the pieces that we want but don't have */
629    for( i = 0; i < inf->pieceCount; ++i )
630        if( !tor->info.pieces[i].dnd
631                && !tr_cpPieceIsComplete( &tor->completion, i ) )
632            pool[poolSize++] = i;
633
634    /* sort the pool by which to request next */
635    if( poolSize > 1 )
636    {
637        tr_piece_index_t j;
638        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
639
640        for( j = 0; j < poolSize; ++j )
641        {
642            int k;
643            const tr_piece_index_t piece = pool[j];
644            struct tr_refill_piece * setme = p + j;
645
646            setme->piece = piece;
647            setme->priority = inf->pieces[piece].priority;
648            setme->peerCount = 0;
649            setme->random = tr_cryptoWeakRandInt( INT_MAX );
650            setme->pendingRequestCount = getPieceRequests( t, piece );
651            setme->missingBlockCount
652                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
653
654            for( k = 0; k < peerCount; ++k )
655            {
656                const tr_peer * peer = peers[k];
657                if( peer->peerIsInterested
658                        && !peer->clientIsChoked
659                        && tr_bitfieldHas( peer->have, piece ) )
660                    ++setme->peerCount;
661            }
662        }
663
664        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
665               compareRefillPiece );
666
667        for( j = 0; j < poolSize; ++j )
668            pool[j] = p[j].piece;
669
670        tr_free( p );
671    }
672
673    *pieceCount = poolSize;
674    return pool;
675}
676
677struct tr_blockIterator
678{
679    Torrent * t;
680    tr_block_index_t blockIndex, blockCount, *blocks;
681    tr_piece_index_t pieceIndex, pieceCount, *pieces;
682};
683
684static struct tr_blockIterator*
685blockIteratorNew( Torrent * t )
686{
687    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
688    i->t = t;
689    i->pieces = getPreferredPieces( t, &i->pieceCount );
690    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
691    return i;
692}
693
694static int
695blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
696{
697    int found;
698    Torrent * t = i->t;
699    tr_torrent * tor = t->tor;
700
701    while( ( i->blockIndex == i->blockCount )
702        && ( i->pieceIndex < i->pieceCount ) )
703    {
704        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
705        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
706        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
707        tr_block_index_t block;
708
709        assert( index < tor->info.pieceCount );
710
711        i->blockCount = 0;
712        i->blockIndex = 0;
713        for( block=b; block!=e; ++block )
714            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
715                i->blocks[i->blockCount++] = block;
716    }
717
718    assert( i->blockCount <= tor->blockCountInPiece );
719
720    if(( found = ( i->blockIndex < i->blockCount )))
721        *setme = i->blocks[i->blockIndex++];
722
723    return found;
724}
725
726static void
727blockIteratorFree( struct tr_blockIterator * i )
728{
729    tr_free( i->blocks );
730    tr_free( i->pieces );
731    tr_free( i );
732}
733
734static tr_peer**
735getPeersUploadingToClient( Torrent * t,
736                           int *     setmeCount )
737{
738    int j;
739    int peerCount = 0;
740    int retCount = 0;
741    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
742    tr_peer ** ret = tr_new( tr_peer *, peerCount );
743
744    j = 0; /* this is a temporary test to make sure we walk through all the peers */
745    if( peerCount )
746    {
747        /* Get a list of peers we're downloading from.
748           Pick a different starting point each time so all peers
749           get a chance at being the first in line */
750        const int fencepost = tr_cryptoWeakRandInt( peerCount );
751        int i = fencepost;
752        do {
753            if( clientIsDownloadingFrom( peers[i] ) )
754                ret[retCount++] = peers[i];
755            i = ( i + 1 ) % peerCount;
756            ++j;
757        } while( i != fencepost );
758    }
759    assert( j == peerCount );
760    *setmeCount = retCount;
761    return ret;
762}
763
764static uint32_t
765getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
766{
767    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
768    const uint64_t blockPos = tor->blockSize * b;
769    assert( blockPos >= piecePos );
770    return (uint32_t)( blockPos - piecePos );
771}
772
773static int
774refillPulse( void * vtorrent )
775{
776    tr_block_index_t block;
777    int peerCount;
778    int webseedCount;
779    tr_peer ** peers;
780    tr_webseed ** webseeds;
781    struct tr_blockIterator * blockIterator;
782    Torrent * t = vtorrent;
783    tr_torrent * tor = t->tor;
784
785    if( !t->isRunning )
786        return TRUE;
787    if( tr_torrentIsSeed( t->tor ) )
788        return TRUE;
789
790    torrentLock( t );
791    tordbg( t, "Refilling Request Buffers..." );
792
793    blockIterator = blockIteratorNew( t );
794    peers = getPeersUploadingToClient( t, &peerCount );
795    webseedCount = tr_ptrArraySize( &t->webseeds );
796    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
797                          webseedCount * sizeof( tr_webseed* ) );
798
799    while( ( webseedCount || peerCount )
800        && blockIteratorNext( blockIterator, &block ) )
801    {
802        int j;
803        int handled = FALSE;
804
805        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
806        const uint32_t offset = getBlockOffsetInPiece( tor, block );
807        const uint32_t length = tr_torBlockCountBytes( tor, block );
808
809        /* find a peer who can ask for this block */
810        for( j=0; !handled && j<peerCount; )
811        {
812            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
813            switch( val )
814            {
815                case TR_ADDREQ_FULL:
816                case TR_ADDREQ_CLIENT_CHOKED:
817                    peers[j] = peers[--peerCount];
818                    break;
819
820                case TR_ADDREQ_MISSING:
821                case TR_ADDREQ_DUPLICATE:
822                    ++j;
823                    break;
824
825                case TR_ADDREQ_OK:
826                    incrementPieceRequests( t, index );
827                    handled = TRUE;
828                    break;
829
830                default:
831                    assert( 0 && "unhandled value" );
832                    break;
833            }
834        }
835
836        /* maybe one of the webseeds can do it */
837        for( j=0; !handled && j<webseedCount; )
838        {
839            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
840            switch( val )
841            {
842                case TR_ADDREQ_FULL:
843                    webseeds[j] = webseeds[--webseedCount];
844                    break;
845
846                case TR_ADDREQ_OK:
847                    incrementPieceRequests( t, index );
848                    handled = TRUE;
849                    break;
850
851                default:
852                    assert( 0 && "unhandled value" );
853                    break;
854            }
855        }
856    }
857
858    /* cleanup */
859    blockIteratorFree( blockIterator );
860    tr_free( webseeds );
861    tr_free( peers );
862
863    t->refillTimer = NULL;
864    torrentUnlock( t );
865    return FALSE;
866}
867
868static void
869broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
870{
871    size_t i;
872    size_t peerCount;
873    tr_peer ** peers;
874
875    assert( torrentIsLocked( t ) );
876
877    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
878
879    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
880    peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
881    for( i=0; i<peerCount; ++i )
882        if( peers[i]->msgs )
883            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
884}
885
886static void
887addStrike( Torrent * t,
888           tr_peer * peer )
889{
890    tordbg( t, "increasing peer %s strike count to %d",
891            tr_peerIoAddrStr( &peer->addr,
892                              peer->port ), peer->strikes + 1 );
893
894    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
895    {
896        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
897        atom->myflags |= MYFLAG_BANNED;
898        peer->doPurge = 1;
899        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
900    }
901}
902
903static void
904gotBadPiece( Torrent *        t,
905             tr_piece_index_t pieceIndex )
906{
907    tr_torrent *   tor = t->tor;
908    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
909
910    tor->corruptCur += byteCount;
911    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
912}
913
914static void
915refillSoon( Torrent * t )
916{
917    if( t->refillTimer == NULL )
918        t->refillTimer = tr_timerNew( t->manager->session,
919                                      refillPulse, t,
920                                      REFILL_PERIOD_MSEC );
921}
922
923static void
924peerSuggestedPiece( Torrent            * t UNUSED,
925                    tr_peer            * peer UNUSED,
926                    tr_piece_index_t     pieceIndex UNUSED,
927                    int                  isFastAllowed UNUSED )
928{
929#if 0
930    assert( t );
931    assert( peer );
932    assert( peer->msgs );
933
934    /* is this a valid piece? */
935    if(  pieceIndex >= t->tor->info.pieceCount )
936        return;
937
938    /* don't ask for it if we've already got it */
939    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
940        return;
941
942    /* don't ask for it if they don't have it */
943    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
944        return;
945
946    /* don't ask for it if we're choked and it's not fast */
947    if( !isFastAllowed && peer->clientIsChoked )
948        return;
949
950    /* request the blocks that we don't have in this piece */
951    {
952        tr_block_index_t block;
953        const tr_torrent * tor = t->tor;
954        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
955        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
956
957        for( block=start; block<end; ++block )
958        {
959            if( !tr_cpBlockIsComplete( tor->completion, block ) )
960            {
961                const uint32_t offset = getBlockOffsetInPiece( tor, block );
962                const uint32_t length = tr_torBlockCountBytes( tor, block );
963                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
964                incrementPieceRequests( t, pieceIndex );
965            }
966        }
967    }
968#endif
969}
970
971static void
972peerCallbackFunc( void * vpeer, void * vevent, void * vt )
973{
974    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
975    Torrent * t = vt;
976    const tr_peer_event * e = vevent;
977
978    torrentLock( t );
979
980    switch( e->eventType )
981    {
982        case TR_PEER_UPLOAD_ONLY:
983            /* update our atom */
984            if( peer ) {
985                struct peer_atom * a = getExistingAtom( t, &peer->addr );
986                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
987            }
988            break;
989
990        case TR_PEER_NEED_REQ:
991            refillSoon( t );
992            break;
993
994        case TR_PEER_CANCEL:
995            decrementPieceRequests( t, e->pieceIndex );
996            break;
997
998        case TR_PEER_PEER_GOT_DATA:
999        {
1000            const time_t now = time( NULL );
1001            tr_torrent * tor = t->tor;
1002
1003            tor->activityDate = now;
1004
1005            if( e->wasPieceData )
1006                tor->uploadedCur += e->length;
1007
1008            /* update the stats */
1009            if( e->wasPieceData )
1010                tr_statsAddUploaded( tor->session, e->length );
1011
1012            /* update our atom */
1013            if( peer ) {
1014                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1015                if( e->wasPieceData )
1016                    a->piece_data_time = now;
1017            }
1018
1019            break;
1020        }
1021
1022        case TR_PEER_CLIENT_GOT_SUGGEST:
1023            if( peer )
1024                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1025            break;
1026
1027        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1028            if( peer )
1029                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1030            break;
1031
1032        case TR_PEER_CLIENT_GOT_DATA:
1033        {
1034            const time_t now = time( NULL );
1035            tr_torrent * tor = t->tor;
1036            tor->activityDate = now;
1037
1038            /* only add this to downloadedCur if we got it from a peer --
1039             * webseeds shouldn't count against our ratio.  As one tracker
1040             * admin put it, "Those pieces are downloaded directly from the
1041             * content distributor, not the peers, it is the tracker's job
1042             * to manage the swarms, not the web server and does not fit
1043             * into the jurisdiction of the tracker." */
1044            if( peer && e->wasPieceData )
1045                tor->downloadedCur += e->length;
1046
1047            /* update the stats */ 
1048            if( e->wasPieceData )
1049                tr_statsAddDownloaded( tor->session, e->length );
1050
1051            /* update our atom */
1052            if( peer ) {
1053                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1054                if( e->wasPieceData )
1055                    a->piece_data_time = now;
1056            }
1057
1058            break;
1059        }
1060
1061        case TR_PEER_PEER_PROGRESS:
1062        {
1063            if( peer )
1064            {
1065                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1066                const int peerIsSeed = e->progress >= 1.0;
1067                if( peerIsSeed ) {
1068                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1069                    atom->flags |= ADDED_F_SEED_FLAG;
1070                } else {
1071                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1072                    atom->flags &= ~ADDED_F_SEED_FLAG;
1073                }
1074            }
1075            break;
1076        }
1077
1078        case TR_PEER_CLIENT_GOT_BLOCK:
1079        {
1080            tr_torrent *     tor = t->tor;
1081
1082            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1083
1084            tr_cpBlockAdd( &tor->completion, block );
1085            decrementPieceRequests( t, e->pieceIndex );
1086
1087            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1088
1089            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1090            {
1091                const tr_piece_index_t p = e->pieceIndex;
1092                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1093
1094                if( !ok )
1095                {
1096                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1097                               (unsigned long)p );
1098                }
1099
1100                tr_torrentSetHasPiece( tor, p, ok );
1101                tr_torrentSetPieceChecked( tor, p, TRUE );
1102                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1103
1104                if( !ok )
1105                    gotBadPiece( t, p );
1106                else
1107                {
1108                    int i;
1109                    int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1110                    tr_peer ** peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1111                    for( i=0; i<peerCount; ++i )
1112                        tr_peerMsgsHave( peers[i]->msgs, p );
1113                }
1114
1115                tr_torrentRecheckCompleteness( tor );
1116            }
1117            break;
1118        }
1119
1120        case TR_PEER_ERROR:
1121            if( e->err == EINVAL )
1122            {
1123                addStrike( t, peer );
1124                peer->doPurge = 1;
1125                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1126            }
1127            else if( ( e->err == ERANGE )
1128                  || ( e->err == EMSGSIZE )
1129                  || ( e->err == ENOTCONN ) )
1130            {
1131                /* some protocol error from the peer */
1132                peer->doPurge = 1;
1133                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1134            }
1135            else /* a local error, such as an IO error */
1136            {
1137                t->tor->error = e->err;
1138                tr_strlcpy( t->tor->errorString,
1139                            tr_strerror( t->tor->error ),
1140                            sizeof( t->tor->errorString ) );
1141                tr_torrentStop( t->tor );
1142            }
1143            break;
1144
1145        default:
1146            assert( 0 );
1147    }
1148
1149    torrentUnlock( t );
1150}
1151
1152static void
1153ensureAtomExists( Torrent          * t,
1154                  const tr_address * addr,
1155                  tr_port            port,
1156                  uint8_t            flags,
1157                  uint8_t            from )
1158{
1159    if( getExistingAtom( t, addr ) == NULL )
1160    {
1161        struct peer_atom * a;
1162        a = tr_new0( struct peer_atom, 1 );
1163        a->addr = *addr;
1164        a->port = port;
1165        a->flags = flags;
1166        a->from = from;
1167        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1168        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1169    }
1170}
1171
1172static int
1173getMaxPeerCount( const tr_torrent * tor )
1174{
1175    return tor->maxConnectedPeers;
1176}
1177
1178static int
1179getPeerCount( const Torrent * t )
1180{
1181    return tr_ptrArraySize( &t->peers ) + tr_ptrArraySize( &t->outgoingHandshakes );
1182}
1183
1184/* FIXME: this is kind of a mess. */
1185static tr_bool
1186myHandshakeDoneCB( tr_handshake  * handshake,
1187                   tr_peerIo     * io,
1188                   int             isConnected,
1189                   const uint8_t * peer_id,
1190                   void          * vmanager )
1191{
1192    tr_bool            ok = isConnected;
1193    tr_bool            success = FALSE;
1194    tr_port            port;
1195    const tr_address * addr;
1196    tr_peerMgr       * manager = vmanager;
1197    Torrent          * t;
1198    tr_handshake     * ours;
1199
1200    assert( io );
1201    assert( isConnected == 0 || isConnected == 1 );
1202
1203    t = tr_peerIoHasTorrentHash( io )
1204        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1205        : NULL;
1206
1207    if( tr_peerIoIsIncoming ( io ) )
1208        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1209                                        handshake, handshakeCompare );
1210    else if( t )
1211        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1212                                        handshake, handshakeCompare );
1213    else
1214        ours = handshake;
1215
1216    assert( ours );
1217    assert( ours == handshake );
1218
1219    if( t )
1220        torrentLock( t );
1221
1222    addr = tr_peerIoGetAddress( io, &port );
1223
1224    if( !ok || !t || !t->isRunning )
1225    {
1226        if( t )
1227        {
1228            struct peer_atom * atom = getExistingAtom( t, addr );
1229            if( atom )
1230                ++atom->numFails;
1231        }
1232    }
1233    else /* looking good */
1234    {
1235        struct peer_atom * atom;
1236        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1237        atom = getExistingAtom( t, addr );
1238        atom->time = time( NULL );
1239        atom->piece_data_time = 0;
1240
1241        if( atom->myflags & MYFLAG_BANNED )
1242        {
1243            tordbg( t, "banned peer %s tried to reconnect",
1244                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1245        }
1246        else if( tr_peerIoIsIncoming( io )
1247               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1248
1249        {
1250        }
1251        else
1252        {
1253            tr_peer * peer = getExistingPeer( t, addr );
1254
1255            if( peer ) /* we already have this peer */
1256            {
1257            }
1258            else
1259            {
1260                peer = getPeer( t, addr );
1261                tr_free( peer->client );
1262
1263                if( !peer_id )
1264                    peer->client = NULL;
1265                else {
1266                    char client[128];
1267                    tr_clientForId( client, sizeof( client ), peer_id );
1268                    peer->client = tr_strdup( client );
1269                }
1270
1271                peer->port = port;
1272                peer->io = tr_handshakeStealIO( handshake );
1273                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1274                tr_peerIoSetBandwidth( io, peer->bandwidth );
1275
1276                success = TRUE;
1277            }
1278        }
1279    }
1280
1281    if( !success )
1282        tr_ptrArrayAppend( &manager->finishedHandshakes, handshake );
1283
1284    if( t )
1285        torrentUnlock( t );
1286
1287    return success;
1288}
1289
1290void
1291tr_peerMgrAddIncoming( tr_peerMgr * manager,
1292                       tr_address * addr,
1293                       tr_port      port,
1294                       int          socket )
1295{
1296    managerLock( manager );
1297
1298    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1299    {
1300        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1301        tr_netClose( socket );
1302    }
1303    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1304    {
1305        tr_netClose( socket );
1306    }
1307    else /* we don't have a connetion to them yet... */
1308    {
1309        tr_peerIo *    io;
1310        tr_handshake * handshake;
1311
1312        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1313
1314        handshake = tr_handshakeNew( io,
1315                                     manager->session->encryptionMode,
1316                                     myHandshakeDoneCB,
1317                                     manager );
1318
1319        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1320                                 handshakeCompare );
1321    }
1322
1323    managerUnlock( manager );
1324}
1325
1326static tr_bool
1327tr_isPex( const tr_pex * pex )
1328{
1329    return pex && tr_isAddress( &pex->addr );
1330}
1331
1332void
1333tr_peerMgrAddPex( tr_peerMgr *    manager,
1334                  const uint8_t * torrentHash,
1335                  uint8_t         from,
1336                  const tr_pex *  pex )
1337{
1338    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1339    {
1340        Torrent * t;
1341        managerLock( manager );
1342
1343        t = getExistingTorrent( manager, torrentHash );
1344        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1345            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1346
1347        managerUnlock( manager );
1348    }
1349}
1350
1351tr_pex *
1352tr_peerMgrCompactToPex( const void *    compact,
1353                        size_t          compactLen,
1354                        const uint8_t * added_f,
1355                        size_t          added_f_len,
1356                        size_t *        pexCount )
1357{
1358    size_t          i;
1359    size_t          n = compactLen / 6;
1360    const uint8_t * walk = compact;
1361    tr_pex *        pex = tr_new0( tr_pex, n );
1362
1363    for( i = 0; i < n; ++i )
1364    {
1365        pex[i].addr.type = TR_AF_INET;
1366        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1367        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1368        if( added_f && ( n == added_f_len ) )
1369            pex[i].flags = added_f[i];
1370    }
1371
1372    *pexCount = n;
1373    return pex;
1374}
1375
1376tr_pex *
1377tr_peerMgrCompact6ToPex( const void    * compact,
1378                         size_t          compactLen,
1379                         const uint8_t * added_f,
1380                         size_t          added_f_len,
1381                         size_t        * pexCount )
1382{
1383    size_t          i;
1384    size_t          n = compactLen / 18;
1385    const uint8_t * walk = compact;
1386    tr_pex *        pex = tr_new0( tr_pex, n );
1387   
1388    for( i = 0; i < n; ++i )
1389    {
1390        pex[i].addr.type = TR_AF_INET6;
1391        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1392        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1393        if( added_f && ( n == added_f_len ) )
1394            pex[i].flags = added_f[i];
1395    }
1396   
1397    *pexCount = n;
1398    return pex;
1399}
1400
1401tr_pex *
1402tr_peerMgrArrayToPex( const void * array,
1403                      size_t       arrayLen,
1404                      size_t      * pexCount )
1405{
1406    size_t          i;
1407    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1408    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1409    const uint8_t * walk = array;
1410    tr_pex        * pex = tr_new0( tr_pex, n );
1411   
1412    for( i = 0 ; i < n ; i++ ) {
1413        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1414        tr_suspectAddress( &pex[i].addr, "tracker"  );
1415        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1416        pex[i].flags = 0x00;
1417        walk += sizeof( tr_address ) + 2;
1418    }
1419   
1420    *pexCount = n;
1421    return pex;
1422}
1423
1424/**
1425***
1426**/
1427
1428void
1429tr_peerMgrSetBlame( tr_peerMgr *     manager,
1430                    const uint8_t *  torrentHash,
1431                    tr_piece_index_t pieceIndex,
1432                    int              success )
1433{
1434    if( !success )
1435    {
1436        int        peerCount, i;
1437        Torrent *  t = getExistingTorrent( manager, torrentHash );
1438        tr_peer ** peers;
1439
1440        assert( torrentIsLocked( t ) );
1441
1442        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1443        for( i = 0; i < peerCount; ++i )
1444        {
1445            tr_peer * peer = peers[i];
1446            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1447            {
1448                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1449                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1450                        pieceIndex, (int)peer->strikes + 1 );
1451                addStrike( t, peer );
1452            }
1453        }
1454    }
1455}
1456
1457int
1458tr_pexCompare( const void * va, const void * vb )
1459{
1460    const tr_pex * a = va;
1461    const tr_pex * b = vb;
1462    int i;
1463
1464    assert( tr_isPex( a ) );
1465    assert( tr_isPex( b ) );
1466
1467    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1468        return i;
1469
1470    if( a->port != b->port )
1471        return a->port < b->port ? -1 : 1;
1472
1473    return 0;
1474}
1475
1476static int
1477peerPrefersCrypto( const tr_peer * peer )
1478{
1479    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1480        return TRUE;
1481
1482    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1483        return FALSE;
1484
1485    return tr_peerIoIsEncrypted( peer->io );
1486}
1487
1488int
1489tr_peerMgrGetPeers( tr_peerMgr      * manager,
1490                    const uint8_t   * torrentHash,
1491                    tr_pex         ** setme_pex,
1492                    uint8_t           af)
1493{
1494    int peersReturning = 0;
1495    const Torrent *  t;
1496
1497    managerLock( manager );
1498
1499    t = getExistingTorrent( manager, torrentHash );
1500    if( t == NULL )
1501    {
1502        *setme_pex = NULL;
1503    }
1504    else
1505    {
1506        int i;
1507        const tr_peer ** peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1508        const int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1509        /* for now, this will waste memory on torrents that have both
1510         * ipv6 and ipv4 peers */
1511        tr_pex * pex = tr_new( tr_pex, peerCount );
1512        tr_pex * walk = pex;
1513
1514        for( i=0; i<peerCount; ++i )
1515        {
1516            const tr_peer * peer = peers[i];
1517            if( peer->addr.type == af )
1518            {
1519                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1520
1521                assert( tr_isAddress( &peer->addr ) );
1522                walk->addr = peer->addr;
1523                walk->port = peer->port;
1524                walk->flags = 0;
1525                if( peerPrefersCrypto( peer ) )
1526                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1527                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1528                    walk->flags |= ADDED_F_SEED_FLAG;
1529                ++peersReturning;
1530                ++walk;
1531            }
1532        }
1533
1534        assert( ( walk - pex ) == peersReturning );
1535        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1536        *setme_pex = pex;
1537    }
1538
1539    managerUnlock( manager );
1540    return peersReturning;
1541}
1542
1543static int reconnectPulse( void * vtorrent );
1544
1545static int rechokePulse( void * vtorrent );
1546
1547void
1548tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1549                        const uint8_t * torrentHash )
1550{
1551    Torrent * t;
1552
1553    managerLock( manager );
1554
1555    t = getExistingTorrent( manager, torrentHash );
1556
1557    assert( t );
1558    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1559    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1560
1561    if( !t->isRunning )
1562    {
1563        t->isRunning = 1;
1564
1565        t->reconnectTimer = tr_timerNew( t->manager->session,
1566                                         reconnectPulse, t,
1567                                         RECONNECT_PERIOD_MSEC );
1568
1569        t->rechokeTimer = tr_timerNew( t->manager->session,
1570                                       rechokePulse, t,
1571                                       RECHOKE_PERIOD_MSEC );
1572
1573        reconnectPulse( t );
1574
1575        rechokePulse( t );
1576
1577        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1578            refillSoon( t );
1579    }
1580
1581    managerUnlock( manager );
1582}
1583
1584static void
1585stopTorrent( Torrent * t )
1586{
1587    assert( torrentIsLocked( t ) );
1588
1589    t->isRunning = 0;
1590    tr_timerFree( &t->rechokeTimer );
1591    tr_timerFree( &t->reconnectTimer );
1592
1593    /* disconnect the peers. */
1594    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1595    tr_ptrArrayClear( &t->peers );
1596
1597    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1598     * which removes the handshake from t->outgoingHandshakes... */
1599    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1600        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1601}
1602
1603void
1604tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1605                       const uint8_t * torrentHash )
1606{
1607    managerLock( manager );
1608
1609    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1610
1611    managerUnlock( manager );
1612}
1613
1614void
1615tr_peerMgrAddTorrent( tr_peerMgr * manager,
1616                      tr_torrent * tor )
1617{
1618    Torrent * t;
1619
1620    managerLock( manager );
1621
1622    assert( tor );
1623    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1624
1625    t = torrentConstructor( manager, tor );
1626    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1627
1628    managerUnlock( manager );
1629}
1630
1631void
1632tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1633                         const uint8_t * torrentHash )
1634{
1635    Torrent * t;
1636
1637    managerLock( manager );
1638
1639    t = getExistingTorrent( manager, torrentHash );
1640    assert( t );
1641    stopTorrent( t );
1642    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1643    torrentDestructor( t );
1644
1645    managerUnlock( manager );
1646}
1647
1648void
1649tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1650                               const uint8_t *    torrentHash,
1651                               int8_t *           tab,
1652                               unsigned int       tabCount )
1653{
1654    tr_piece_index_t   i;
1655    const Torrent *    t;
1656    const tr_torrent * tor;
1657    float              interval;
1658    tr_bool            isSeed;
1659    int                peerCount;
1660    const tr_peer **   peers;
1661    managerLock( manager );
1662
1663    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1664    tor = t->tor;
1665    interval = tor->info.pieceCount / (float)tabCount;
1666    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1667    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1668    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1669
1670    memset( tab, 0, tabCount );
1671
1672    for( i = 0; tor && i < tabCount; ++i )
1673    {
1674        const int piece = i * interval;
1675
1676        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1677            tab[i] = -1;
1678        else if( peerCount ) {
1679            int j;
1680            for( j = 0; j < peerCount; ++j )
1681                if( tr_bitfieldHas( peers[j]->have, i ) )
1682                    ++tab[i];
1683        }
1684    }
1685
1686    managerUnlock( manager );
1687}
1688
1689/* Returns the pieces that are available from peers */
1690tr_bitfield*
1691tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1692                        const uint8_t *    torrentHash )
1693{
1694    int i;
1695    int peerCount;
1696    Torrent * t;
1697    const tr_peer ** peers;
1698    tr_bitfield * pieces;
1699    managerLock( manager );
1700
1701    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1702    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1703    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1704    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1705    for( i=0; i<peerCount; ++i )
1706        tr_bitfieldOr( pieces, peers[i]->have );
1707
1708    managerUnlock( manager );
1709    return pieces;
1710}
1711
1712int
1713tr_peerMgrHasConnections( const tr_peerMgr * manager,
1714                          const uint8_t *    torrentHash )
1715{
1716    int ret;
1717    const Torrent * t;
1718    managerLock( manager );
1719
1720    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1721    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1722
1723    managerUnlock( manager );
1724    return ret;
1725}
1726
1727void
1728tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1729                        const uint8_t    * torrentHash,
1730                        int              * setmePeersKnown,
1731                        int              * setmePeersConnected,
1732                        int              * setmeSeedsConnected,
1733                        int              * setmeWebseedsSendingToUs,
1734                        int              * setmePeersSendingToUs,
1735                        int              * setmePeersGettingFromUs,
1736                        int              * setmePeersFrom )
1737{
1738    int i, size;
1739    const Torrent * t;
1740    const tr_peer ** peers;
1741    const tr_webseed ** webseeds;
1742
1743    managerLock( manager );
1744
1745    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1746    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1747    size = TR_PTR_ARRAY_LENGTH( &t->peers );
1748
1749    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1750    *setmePeersConnected       = 0;
1751    *setmeSeedsConnected       = 0;
1752    *setmePeersGettingFromUs   = 0;
1753    *setmePeersSendingToUs     = 0;
1754    *setmeWebseedsSendingToUs  = 0;
1755
1756    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1757        setmePeersFrom[i] = 0;
1758
1759    for( i=0; i<size; ++i )
1760    {
1761        const tr_peer * peer = peers[i];
1762        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1763
1764        if( peer->io == NULL ) /* not connected */
1765            continue;
1766
1767        ++*setmePeersConnected;
1768
1769        ++setmePeersFrom[atom->from];
1770
1771        if( clientIsDownloadingFrom( peer ) )
1772            ++*setmePeersSendingToUs;
1773
1774        if( clientIsUploadingTo( peer ) )
1775            ++*setmePeersGettingFromUs;
1776
1777        if( atom->flags & ADDED_F_SEED_FLAG )
1778            ++*setmeSeedsConnected;
1779    }
1780
1781    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1782    size = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1783    for( i=0; i<size; ++i )
1784        if( tr_webseedIsActive( webseeds[i] ) )
1785            ++*setmeWebseedsSendingToUs;
1786
1787    managerUnlock( manager );
1788}
1789
1790float*
1791tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1792                     const uint8_t *    torrentHash )
1793{
1794    const Torrent * t;
1795    const tr_webseed ** webseeds;
1796    int i;
1797    int webseedCount;
1798    float * ret;
1799
1800    assert( manager );
1801    managerLock( manager );
1802
1803    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1804    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1805    webseedCount = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1806    assert( webseedCount == t->tor->info.webseedCount );
1807    ret = tr_new0( float, webseedCount );
1808
1809    for( i=0; i<webseedCount; ++i )
1810        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1811            ret[i] = -1.0;
1812
1813    managerUnlock( manager );
1814    return ret;
1815}
1816
1817double
1818tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1819{
1820    assert( peer );
1821    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1822
1823    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1824}
1825
1826
1827struct tr_peer_stat *
1828tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1829                     const   uint8_t     * torrentHash,
1830                     int                 * setmeCount )
1831{
1832    int i, size;
1833    const Torrent * t;
1834    const tr_peer ** peers;
1835    tr_peer_stat * ret;
1836
1837    assert( manager );
1838    managerLock( manager );
1839
1840    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1841    size = TR_PTR_ARRAY_LENGTH( &t->peers );
1842    peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1843    ret = tr_new0( tr_peer_stat, size );
1844
1845    for( i = 0; i < size; ++i )
1846    {
1847        char *                   pch;
1848        const tr_peer *          peer = peers[i];
1849        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1850        tr_peer_stat *           stat = ret + i;
1851        tr_address               norm_addr;
1852
1853        norm_addr = peer->addr;
1854        tr_normalizeV4Mapped( &norm_addr );
1855        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1856        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1857                   sizeof( stat->client ) );
1858        stat->port               = ntohs( peer->port );
1859        stat->from               = atom->from;
1860        stat->progress           = peer->progress;
1861        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1862        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1863        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1864        stat->peerIsChoked       = peer->peerIsChoked;
1865        stat->peerIsInterested   = peer->peerIsInterested;
1866        stat->clientIsChoked     = peer->clientIsChoked;
1867        stat->clientIsInterested = peer->clientIsInterested;
1868        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1869        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1870        stat->isUploadingTo      = clientIsUploadingTo( peer );
1871        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1872
1873        pch = stat->flagStr;
1874        if( t->optimistic == peer ) *pch++ = 'O';
1875        if( stat->isDownloadingFrom ) *pch++ = 'D';
1876        else if( stat->clientIsInterested ) *pch++ = 'd';
1877        if( stat->isUploadingTo ) *pch++ = 'U';
1878        else if( stat->peerIsInterested ) *pch++ = 'u';
1879        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1880        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1881        if( stat->isEncrypted ) *pch++ = 'E';
1882        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1883        if( stat->isIncoming ) *pch++ = 'I';
1884        *pch = '\0';
1885    }
1886
1887    *setmeCount = size;
1888
1889    managerUnlock( manager );
1890    return ret;
1891}
1892
1893/**
1894***
1895**/
1896
1897struct ChokeData
1898{
1899    tr_bool         doUnchoke;
1900    tr_bool         isInterested;
1901    tr_bool         isChoked;
1902    int             rate;
1903    tr_peer *       peer;
1904};
1905
1906static int
1907compareChoke( const void * va,
1908              const void * vb )
1909{
1910    const struct ChokeData * a = va;
1911    const struct ChokeData * b = vb;
1912
1913    if( a->rate != b->rate ) /* prefer higher overall speeds */
1914        return a->rate > b->rate ? -1 : 1;
1915
1916    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1917        return a->isChoked ? 1 : -1;
1918
1919    return 0;
1920}
1921
1922static int
1923isNew( const tr_peer * peer )
1924{
1925    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1926}
1927
1928static int
1929isSame( const tr_peer * peer )
1930{
1931    return peer && peer->client && strstr( peer->client, "Transmission" );
1932}
1933
1934/**
1935***
1936**/
1937
1938static void
1939rechoke( Torrent * t )
1940{
1941    int i, size, unchokedInterested;
1942    const int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1943    tr_peer ** peers = (tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1944    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1945    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1946
1947    assert( torrentIsLocked( t ) );
1948
1949    /* sort the peers by preference and rate */
1950    for( i = 0, size = 0; i < peerCount; ++i )
1951    {
1952        tr_peer * peer = peers[i];
1953        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1954
1955        if( peer->progress >= 1.0 ) /* choke all seeds */
1956        {
1957            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1958        }
1959        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1960        {
1961            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1962        }
1963        else if( chokeAll ) /* choke everyone if we're not uploading */
1964        {
1965            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1966        }
1967        else
1968        {
1969            struct ChokeData * n = &choke[size++];
1970            n->peer         = peer;
1971            n->isInterested = peer->peerIsInterested;
1972            n->isChoked     = peer->peerIsChoked;
1973            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1974        }
1975    }
1976
1977    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1978
1979    /**
1980     * Reciprocation and number of uploads capping is managed by unchoking
1981     * the N peers which have the best upload rate and are interested.
1982     * This maximizes the client's download rate. These N peers are
1983     * referred to as downloaders, because they are interested in downloading
1984     * from the client.
1985     *
1986     * Peers which have a better upload rate (as compared to the downloaders)
1987     * but aren't interested get unchoked. If they become interested, the
1988     * downloader with the worst upload rate gets choked. If a client has
1989     * a complete file, it uses its upload rate rather than its download
1990     * rate to decide which peers to unchoke.
1991     */
1992    unchokedInterested = 0;
1993    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1994        choke[i].doUnchoke = 1;
1995        if( choke[i].isInterested )
1996            ++unchokedInterested;
1997    }
1998
1999    /* optimistic unchoke */
2000    if( i < size )
2001    {
2002        int n;
2003        struct ChokeData * c;
2004        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2005
2006        for( ; i<size; ++i )
2007        {
2008            if( choke[i].isInterested )
2009            {
2010                const tr_peer * peer = choke[i].peer;
2011                int x = 1, y;
2012                if( isNew( peer ) ) x *= 3;
2013                if( isSame( peer ) ) x *= 3;
2014                for( y=0; y<x; ++y )
2015                    tr_ptrArrayAppend( &randPool, &choke[i] );
2016            }
2017        }
2018
2019        if(( n = tr_ptrArraySize( &randPool )))
2020        {
2021            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2022            c->doUnchoke = 1;
2023            t->optimistic = c->peer;
2024        }
2025
2026        tr_ptrArrayDestruct( &randPool, NULL );
2027    }
2028
2029    for( i=0; i<size; ++i )
2030        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2031
2032    /* cleanup */
2033    tr_free( choke );
2034}
2035
2036static int
2037rechokePulse( void * vtorrent )
2038{
2039    Torrent * t = vtorrent;
2040
2041    torrentLock( t );
2042    rechoke( t );
2043    torrentUnlock( t );
2044    return TRUE;
2045}
2046
2047/***
2048****
2049****  Life and Death
2050****
2051***/
2052
2053static int
2054shouldPeerBeClosed( const Torrent * t,
2055                    const tr_peer * peer,
2056                    int             peerCount )
2057{
2058    const tr_torrent *       tor = t->tor;
2059    const time_t             now = time( NULL );
2060    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2061
2062    /* if it's marked for purging, close it */
2063    if( peer->doPurge )
2064    {
2065        tordbg( t, "purging peer %s because its doPurge flag is set",
2066                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2067        return TRUE;
2068    }
2069
2070    /* if we're seeding and the peer has everything we have,
2071     * and enough time has passed for a pex exchange, then disconnect */
2072    if( tr_torrentIsSeed( tor ) )
2073    {
2074        int peerHasEverything;
2075        if( atom->flags & ADDED_F_SEED_FLAG )
2076            peerHasEverything = TRUE;
2077        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2078            peerHasEverything = FALSE;
2079        else {
2080            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2081            tr_bitfieldDifference( tmp, peer->have );
2082            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2083            tr_bitfieldFree( tmp );
2084        }
2085
2086        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2087        {
2088            tordbg( t, "purging peer %s because we're both seeds",
2089                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2090            return TRUE;
2091        }
2092    }
2093
2094    /* disconnect if it's been too long since piece data has been transferred.
2095     * this is on a sliding scale based on number of available peers... */
2096    {
2097        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2098        /* if we have >= relaxIfFewerThan, strictness is 100%.
2099         * if we have zero connections, strictness is 0% */
2100        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2101            ? 1.0
2102            : peerCount / (float)relaxStrictnessIfFewerThanN;
2103        const int lo = MIN_UPLOAD_IDLE_SECS;
2104        const int hi = MAX_UPLOAD_IDLE_SECS;
2105        const int limit = hi - ( ( hi - lo ) * strictness );
2106        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2107/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2108        if( idleTime > limit ) {
2109            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2110                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2111            return TRUE;
2112        }
2113    }
2114
2115    return FALSE;
2116}
2117
2118static tr_peer **
2119getPeersToClose( Torrent * t, int * setmeSize )
2120{
2121    int i, peerCount, outsize;
2122    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2123    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2124
2125    assert( torrentIsLocked( t ) );
2126
2127    for( i = outsize = 0; i < peerCount; ++i )
2128        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2129            ret[outsize++] = peers[i];
2130
2131    *setmeSize = outsize;
2132    return ret;
2133}
2134
2135static int
2136compareCandidates( const void * va,
2137                   const void * vb )
2138{
2139    const struct peer_atom * a = *(const struct peer_atom**) va;
2140    const struct peer_atom * b = *(const struct peer_atom**) vb;
2141
2142    /* <Charles> Here we would probably want to try reconnecting to
2143     * peers that had most recently given us data. Lots of users have
2144     * trouble with resets due to their routers and/or ISPs. This way we
2145     * can quickly recover from an unwanted reset. So we sort
2146     * piece_data_time in descending order.
2147     */
2148
2149    if( a->piece_data_time != b->piece_data_time )
2150        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2151
2152    if( a->numFails != b->numFails )
2153        return a->numFails < b->numFails ? -1 : 1;
2154
2155    if( a->time != b->time )
2156        return a->time < b->time ? -1 : 1;
2157
2158    /* all other things being equal, prefer peers whose
2159     * information comes from a more reliable source */
2160    if( a->from != b->from )
2161        return a->from < b->from ? -1 : 1;
2162
2163    return 0;
2164}
2165
2166static int
2167getReconnectIntervalSecs( const struct peer_atom * atom )
2168{
2169    int          sec;
2170    const time_t now = time( NULL );
2171
2172    /* if we were recently connected to this peer and transferring piece
2173     * data, try to reconnect to them sooner rather that later -- we don't
2174     * want network troubles to get in the way of a good peer. */
2175    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2176        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2177
2178    /* don't allow reconnects more often than our minimum */
2179    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2180        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2181
2182    /* otherwise, the interval depends on how many times we've tried
2183     * and failed to connect to the peer */
2184    else switch( atom->numFails ) {
2185        case 0: sec = 0; break;
2186        case 1: sec = 5; break;
2187        case 2: sec = 2 * 60; break;
2188        case 3: sec = 15 * 60; break;
2189        case 4: sec = 30 * 60; break;
2190        case 5: sec = 60 * 60; break;
2191        default: sec = 120 * 60; break;
2192    }
2193
2194    return sec;
2195}
2196
2197static struct peer_atom **
2198getPeerCandidates( Torrent * t, int * setmeSize )
2199{
2200    int                 i, atomCount, retCount;
2201    struct peer_atom ** atoms;
2202    struct peer_atom ** ret;
2203    const time_t        now = time( NULL );
2204    const int           seed = tr_torrentIsSeed( t->tor );
2205
2206    assert( torrentIsLocked( t ) );
2207
2208    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2209    ret = tr_new( struct peer_atom*, atomCount );
2210    for( i = retCount = 0; i < atomCount; ++i )
2211    {
2212        int                interval;
2213        struct peer_atom * atom = atoms[i];
2214
2215        /* peer fed us too much bad data ... we only keep it around
2216         * now to weed it out in case someone sends it to us via pex */
2217        if( atom->myflags & MYFLAG_BANNED )
2218            continue;
2219
2220        /* peer was unconnectable before, so we're not going to keep trying.
2221         * this is needs a separate flag from `banned', since if they try
2222         * to connect to us later, we'll let them in */
2223        if( atom->myflags & MYFLAG_UNREACHABLE )
2224            continue;
2225
2226        /* we don't need two connections to the same peer... */
2227        if( peerIsInUse( t, &atom->addr ) )
2228            continue;
2229
2230        /* no need to connect if we're both seeds... */
2231        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2232                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2233            continue;
2234
2235        /* don't reconnect too often */
2236        interval = getReconnectIntervalSecs( atom );
2237        if( ( now - atom->time ) < interval )
2238        {
2239            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2240                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2241            continue;
2242        }
2243
2244        /* Don't connect to peers in our blocklist */
2245        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2246            continue;
2247
2248        ret[retCount++] = atom;
2249    }
2250
2251    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2252    *setmeSize = retCount;
2253    return ret;
2254}
2255
2256static int
2257reconnectPulse( void * vtorrent )
2258{
2259    Torrent *     t = vtorrent;
2260    static time_t prevTime = 0;
2261    static int    newConnectionsThisSecond = 0;
2262    time_t        now;
2263
2264    torrentLock( t );
2265
2266    now = time( NULL );
2267    if( prevTime != now )
2268    {
2269        prevTime = now;
2270        newConnectionsThisSecond = 0;
2271    }
2272
2273    if( !t->isRunning )
2274    {
2275        removeAllPeers( t );
2276    }
2277    else
2278    {
2279        int i, nCandidates, nBad;
2280        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2281        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2282
2283        if( nBad || nCandidates )
2284            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2285                    "%d connection candidates, %d atoms, max per pulse is %d",
2286                    t->tor->info.name, nBad, nCandidates,
2287                    tr_ptrArraySize( &t->pool ),
2288                    (int)MAX_RECONNECTIONS_PER_PULSE );
2289
2290        /* disconnect some peers.
2291           if we transferred piece data, then they might be good peers,
2292           so reset their `numFails' weight to zero.  otherwise we connected
2293           to them fruitlessly, so mark it as another fail */
2294        for( i = 0; i < nBad; ++i ) {
2295            tr_peer * peer = connections[i];
2296            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2297            if( atom->piece_data_time )
2298                atom->numFails = 0;
2299            else
2300                ++atom->numFails;
2301            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2302            removePeer( t, peer );
2303        }
2304
2305        /* add some new ones */
2306        for( i = 0;    ( i < nCandidates )
2307           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2308           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2309           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2310        {
2311            tr_peerMgr *       mgr = t->manager;
2312            struct peer_atom * atom = candidates[i];
2313            tr_peerIo *        io;
2314
2315            tordbg( t, "Starting an OUTGOING connection with %s",
2316                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2317
2318            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2319            if( io == NULL )
2320            {
2321                atom->myflags |= MYFLAG_UNREACHABLE;
2322            }
2323            else
2324            {
2325                tr_handshake * handshake = tr_handshakeNew( io,
2326                                                            mgr->session->encryptionMode,
2327                                                            myHandshakeDoneCB,
2328                                                            mgr );
2329
2330                assert( tr_peerIoGetTorrentHash( io ) );
2331
2332                ++newConnectionsThisSecond;
2333
2334                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2335                                         handshakeCompare );
2336            }
2337
2338            atom->time = time( NULL );
2339        }
2340
2341        /* cleanup */
2342        tr_free( connections );
2343        tr_free( candidates );
2344    }
2345
2346    torrentUnlock( t );
2347    return TRUE;
2348}
2349
2350/****
2351*****
2352*****  BANDWIDTH ALLOCATION
2353*****
2354****/
2355
2356static void
2357pumpAllPeers( tr_peerMgr * mgr )
2358{
2359    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2360    int       i, j;
2361
2362    for( i=0; i<torrentCount; ++i )
2363    {
2364        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2365        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2366        {
2367            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2368            tr_peerMsgsPulse( peer->msgs );
2369        }
2370    }
2371}
2372
2373static int
2374bandwidthPulse( void * vmgr )
2375{
2376    tr_handshake * handshake;
2377    tr_peerMgr * mgr = vmgr;
2378    managerLock( mgr );
2379
2380    /* FIXME: this next line probably isn't necessary... */
2381    pumpAllPeers( mgr );
2382
2383    /* allocate bandwidth to the peers */
2384    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2385    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2386
2387    /* free all the finished handshakes */
2388    while(( handshake = tr_ptrArrayPop( &mgr->finishedHandshakes )))
2389        tr_handshakeFree( handshake );
2390
2391    managerUnlock( mgr );
2392    return TRUE;
2393}
Note: See TracBrowser for help on using the repository browser.