source: branches/1.5x/libtransmission/peer-mgr.c @ 8204

Last change on this file since 8204 was 8204, checked in by charles, 13 years ago

(1.5x libT) various backports for 1.52:
(1) recognize Aria2 as a client
(2) remove jhujhiti's tr_suspectAddress(), since he removed it from trunka
(3) on Mac, better detection of where the Web UI files are located
(4) reintroduce the web task queue
(5) various minor formatting changes to reduce the diffs between 1.52 and trunk

  • Property svn:keywords set to Date Rev Author Id
File size: 69.6 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 8204 2009-04-10 17:34:25Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to age out old piece request lists */
56    REFILL_UPKEEP_PERIOD_MSEC = 10000,
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = 500,
60
61    /* when many peers are available, keep idle ones this long */
62    MIN_UPLOAD_IDLE_SECS = ( 30 ),
63
64    /* when few peers are available, keep idle ones this long */
65    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
66
67    /* max # of peers to ask fer per torrent per reconnect pulse */
68    MAX_RECONNECTIONS_PER_PULSE = 16,
69
70    /* max number of peers to ask for per second overall.
71    * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 32,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* amount of time to keep a list of request pieces lying around
78       before it's considered too old and needs to be rebuilt */
79    PIECE_LIST_SHELF_LIFE_SECS = 60,
80
81    /* use for bitwise operations w/peer_atom.myflags */
82    MYFLAG_BANNED = 1,
83
84    /* use for bitwise operations w/peer_atom.myflags */
85    /* unreachable for now... but not banned.
86     * if they try to connect to us it's okay */
87    MYFLAG_UNREACHABLE = 2,
88
89    /* the minimum we'll wait before attempting to reconnect to a peer */
90    MINIMUM_RECONNECT_INTERVAL_SECS = 5
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125};
126
127struct tr_blockIterator
128{
129    time_t expirationDate;
130    struct tr_torrent_peers * t;
131    tr_block_index_t blockIndex, blockCount, *blocks;
132    tr_piece_index_t pieceIndex, pieceCount, *pieces;
133};
134
135typedef struct tr_torrent_peers
136{
137    tr_bool                    isRunning;
138
139    uint8_t                    hash[SHA_DIGEST_LENGTH];
140    int                      * pendingRequestCount;
141    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
142    tr_ptrArray                pool; /* struct peer_atom */
143    tr_ptrArray                peers; /* tr_peer */
144    tr_ptrArray                webseeds; /* tr_webseed */
145    tr_timer                 * refillTimer;
146    tr_torrent               * tor;
147    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
148    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
149
150    struct tr_peerMgr        * manager;
151}
152Torrent;
153
154struct tr_peerMgr
155{
156    tr_session      * session;
157    tr_ptrArray       incomingHandshakes; /* tr_handshake */
158    tr_timer        * bandwidthTimer;
159    tr_timer        * rechokeTimer;
160    tr_timer        * reconnectTimer;
161    tr_timer        * refillUpkeepTimer;
162};
163
164#define tordbg( t, ... ) \
165    do { \
166        if( tr_deepLoggingIsActive( ) ) \
167            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
168    } while( 0 )
169
170#define dbgmsg( ... ) \
171    do { \
172        if( tr_deepLoggingIsActive( ) ) \
173            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
174    } while( 0 )
175
176/**
177***
178**/
179
180static TR_INLINE void
181managerLock( const struct tr_peerMgr * manager )
182{
183    tr_globalLock( manager->session );
184}
185
186static TR_INLINE void
187managerUnlock( const struct tr_peerMgr * manager )
188{
189    tr_globalUnlock( manager->session );
190}
191
192static TR_INLINE void
193torrentLock( Torrent * torrent )
194{
195    managerLock( torrent->manager );
196}
197
198static TR_INLINE void
199torrentUnlock( Torrent * torrent )
200{
201    managerUnlock( torrent->manager );
202}
203
204static TR_INLINE int
205torrentIsLocked( const Torrent * t )
206{
207    return tr_globalIsLocked( t->manager->session );
208}
209
210/**
211***
212**/
213
214static int
215handshakeCompareToAddr( const void * va, const void * vb )
216{
217    const tr_handshake * a = va;
218
219    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
220}
221
222static int
223handshakeCompare( const void * a, const void * b )
224{
225    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
226}
227
228static tr_handshake*
229getExistingHandshake( tr_ptrArray      * handshakes,
230                      const tr_address * addr )
231{
232    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
233}
234
235static int
236comparePeerAtomToAddress( const void * va, const void * vb )
237{
238    const struct peer_atom * a = va;
239
240    return tr_compareAddresses( &a->addr, vb );
241}
242
243static int
244comparePeerAtoms( const void * va, const void * vb )
245{
246    const struct peer_atom * b = vb;
247
248    return comparePeerAtomToAddress( va, &b->addr );
249}
250
251/**
252***
253**/
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
260
261    return tor == NULL ? NULL : tor->torrentPeers;
262}
263
264static int
265peerCompare( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    const tr_peer * b = vb;
269
270    return tr_compareAddresses( &a->addr, &b->addr );
271}
272
273static int
274peerCompareToAddr( const void * va, const void * vb )
275{
276    const tr_peer * a = va;
277
278    return tr_compareAddresses( &a->addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent          * torrent,
283                 const tr_address * addr )
284{
285    assert( torrentIsLocked( torrent ) );
286    assert( addr );
287
288    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
289}
290
291static struct peer_atom*
292getExistingAtom( const Torrent    * t,
293                 const tr_address * addr )
294{
295    Torrent * tt = (Torrent*)t;
296    assert( torrentIsLocked( t ) );
297    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
298}
299
300static tr_bool
301peerIsInUse( const Torrent    * ct,
302             const tr_address * addr )
303{
304    Torrent * t = (Torrent*) ct;
305
306    assert( torrentIsLocked ( t ) );
307
308    return getExistingPeer( t, addr )
309        || getExistingHandshake( &t->outgoingHandshakes, addr )
310        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
311}
312
313static tr_peer*
314peerConstructor( const tr_address * addr )
315{
316    tr_peer * p;
317    p = tr_new0( tr_peer, 1 );
318    p->addr = *addr;
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent          * torrent,
324         const tr_address * addr )
325{
326    tr_peer * peer;
327
328    assert( torrentIsLocked( torrent ) );
329
330    peer = getExistingPeer( torrent, addr );
331
332    if( peer == NULL )
333    {
334        peer = peerConstructor( addr );
335        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
336    }
337
338    return peer;
339}
340
341static void
342peerDestructor( tr_peer * peer )
343{
344    assert( peer );
345
346    if( peer->msgs != NULL )
347    {
348        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
349        tr_peerMsgsFree( peer->msgs );
350    }
351
352    tr_peerIoClear( peer->io );
353    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
354
355    tr_bitfieldFree( peer->have );
356    tr_bitfieldFree( peer->blame );
357    tr_free( peer->client );
358
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t,
364            tr_peer * peer )
365{
366    tr_peer *          removed;
367    struct peer_atom * atom;
368
369    assert( torrentIsLocked( t ) );
370
371    atom = getExistingAtom( t, &peer->addr );
372    assert( atom );
373    atom->time = time( NULL );
374
375    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
376    assert( removed == peer );
377    peerDestructor( removed );
378}
379
380static void
381removeAllPeers( Torrent * t )
382{
383    while( !tr_ptrArrayEmpty( &t->peers ) )
384        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
385}
386
387static void blockIteratorFree( struct tr_blockIterator ** inout );
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( torrentIsLocked( t ) );
398    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
399    assert( tr_ptrArrayEmpty( &t->peers ) );
400
401    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
402
403    tr_timerFree( &t->refillTimer );
404
405    blockIteratorFree( &t->refillQueue );
406    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
407    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
408    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
409    tr_ptrArrayDestruct( &t->peers, NULL );
410
411    tr_free( t->pendingRequestCount );
412    tr_free( t );
413}
414
415static void peerCallbackFunc( void * vpeer,
416                              void * vevent,
417                              void * vt );
418
419static Torrent*
420torrentConstructor( tr_peerMgr * manager,
421                    tr_torrent * tor )
422{
423    int       i;
424    Torrent * t;
425
426    t = tr_new0( Torrent, 1 );
427    t->manager = manager;
428    t->tor = tor;
429    t->pool = TR_PTR_ARRAY_INIT;
430    t->peers = TR_PTR_ARRAY_INIT;
431    t->webseeds = TR_PTR_ARRAY_INIT;
432    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
434
435    for( i = 0; i < tor->info.webseedCount; ++i )
436    {
437        tr_webseed * w =
438            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
439        tr_ptrArrayAppend( &t->webseeds, w );
440    }
441
442    return t;
443}
444
445
446static int bandwidthPulse ( void * vmgr );
447static int rechokePulse   ( void * vmgr );
448static int reconnectPulse ( void * vmgr );
449static int refillUpkeep   ( void * vmgr );
450
451tr_peerMgr*
452tr_peerMgrNew( tr_session * session )
453{
454    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
455
456    m->session = session;
457    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
458    m->bandwidthTimer    = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
459    m->rechokeTimer      = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
460    m->reconnectTimer    = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
461    m->refillUpkeepTimer = tr_timerNew( session, refillUpkeep,   m, REFILL_UPKEEP_PERIOD_MSEC );
462
463    rechokePulse( m );
464
465    return m;
466}
467
468void
469tr_peerMgrFree( tr_peerMgr * manager )
470{
471    managerLock( manager );
472
473    tr_timerFree( &manager->refillUpkeepTimer );
474    tr_timerFree( &manager->reconnectTimer );
475    tr_timerFree( &manager->rechokeTimer );
476    tr_timerFree( &manager->bandwidthTimer );
477
478    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
479     * the item from manager->handshakes, so this is a little roundabout... */
480    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
481        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
482
483    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
484
485    managerUnlock( manager );
486    tr_free( manager );
487}
488
489static int
490clientIsDownloadingFrom( const tr_peer * peer )
491{
492    return peer->clientIsInterested && !peer->clientIsChoked;
493}
494
495static int
496clientIsUploadingTo( const tr_peer * peer )
497{
498    return peer->peerIsInterested && !peer->peerIsChoked;
499}
500
501/***
502****
503***/
504
505tr_bool
506tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
507                      const tr_address  * addr )
508{
509    tr_bool isSeed = FALSE;
510    const Torrent * t = tor->torrentPeers;
511    const struct peer_atom * atom = getExistingAtom( t, addr );
512
513    if( atom )
514        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
515
516    return isSeed;
517}
518
519/****
520*****
521*****  REFILL
522*****
523****/
524
525static void
526assertValidPiece( Torrent * t, tr_piece_index_t piece )
527{
528    assert( t );
529    assert( t->tor );
530    assert( piece < t->tor->info.pieceCount );
531}
532
533static int
534getPieceRequests( Torrent * t, tr_piece_index_t piece )
535{
536    assertValidPiece( t, piece );
537
538    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
539}
540
541static void
542incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
543{
544    assertValidPiece( t, piece );
545
546    if( t->pendingRequestCount == NULL )
547        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
548    t->pendingRequestCount[piece]++;
549}
550
551static void
552decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
553{
554    assertValidPiece( t, piece );
555
556    if( t->pendingRequestCount )
557        t->pendingRequestCount[piece]--;
558}
559
560struct tr_refill_piece
561{
562    tr_priority_t    priority;
563    uint32_t         piece;
564    uint32_t         peerCount;
565    int              random;
566    int              pendingRequestCount;
567    int              missingBlockCount;
568};
569
570static int
571compareRefillPiece( const void * aIn, const void * bIn )
572{
573    const struct tr_refill_piece * a = aIn;
574    const struct tr_refill_piece * b = bIn;
575
576    /* if one piece has a higher priority, it goes first */
577    if( a->priority != b->priority )
578        return a->priority > b->priority ? -1 : 1;
579
580    /* have a per-priority endgame */
581    if( a->pendingRequestCount != b->pendingRequestCount )
582        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
583
584    /* fewer missing pieces goes first */
585    if( a->missingBlockCount != b->missingBlockCount )
586        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
587
588    /* otherwise if one has fewer peers, it goes first */
589    if( a->peerCount != b->peerCount )
590        return a->peerCount < b->peerCount ? -1 : 1;
591
592    /* otherwise go with our random seed */
593    if( a->random != b->random )
594        return a->random < b->random ? -1 : 1;
595
596    return 0;
597}
598
599static tr_piece_index_t *
600getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
601{
602    const tr_torrent  * tor = t->tor;
603    const tr_info     * inf = &tor->info;
604    tr_piece_index_t    i;
605    tr_piece_index_t    poolSize = 0;
606    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
607    int                 peerCount;
608    const tr_peer    ** peers;
609
610    assert( torrentIsLocked( t ) );
611
612    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
613    peerCount = tr_ptrArraySize( &t->peers );
614
615    /* make a list of the pieces that we want but don't have */
616    for( i = 0; i < inf->pieceCount; ++i )
617        if( !tor->info.pieces[i].dnd
618                && !tr_cpPieceIsComplete( &tor->completion, i ) )
619            pool[poolSize++] = i;
620
621    /* sort the pool by which to request next */
622    if( poolSize > 1 )
623    {
624        tr_piece_index_t j;
625        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
626
627        for( j = 0; j < poolSize; ++j )
628        {
629            int k;
630            const tr_piece_index_t piece = pool[j];
631            struct tr_refill_piece * setme = p + j;
632
633            setme->piece = piece;
634            setme->priority = inf->pieces[piece].priority;
635            setme->peerCount = 0;
636            setme->random = tr_cryptoWeakRandInt( INT_MAX );
637            setme->pendingRequestCount = getPieceRequests( t, piece );
638            setme->missingBlockCount
639                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
640
641            for( k = 0; k < peerCount; ++k )
642            {
643                const tr_peer * peer = peers[k];
644                if( peer->peerIsInterested
645                        && !peer->clientIsChoked
646                        && tr_bitfieldHas( peer->have, piece ) )
647                    ++setme->peerCount;
648            }
649        }
650
651        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
652               compareRefillPiece );
653
654        for( j = 0; j < poolSize; ++j )
655            pool[j] = p[j].piece;
656
657        tr_free( p );
658    }
659
660    *pieceCount = poolSize;
661    return pool;
662}
663
664static struct tr_blockIterator*
665blockIteratorNew( Torrent * t )
666{
667    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
668    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
669    i->t = t;
670    i->pieces = getPreferredPieces( t, &i->pieceCount );
671    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
672    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
673    return i;
674}
675
676static tr_bool
677blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
678{
679    tr_bool found;
680    Torrent * t = i->t;
681    tr_torrent * tor = t->tor;
682
683    while( ( i->blockIndex == i->blockCount )
684        && ( i->pieceIndex < i->pieceCount ) )
685    {
686        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
687        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
688        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
689        tr_block_index_t block;
690
691        assert( index < tor->info.pieceCount );
692
693        i->blockCount = 0;
694        i->blockIndex = 0;
695        for( block=b; block!=e; ++block )
696            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
697                i->blocks[i->blockCount++] = block;
698    }
699
700    assert( i->blockCount <= tor->blockCountInPiece );
701
702    if(( found = ( i->blockIndex < i->blockCount )))
703        *setme = i->blocks[i->blockIndex++];
704
705    return found;
706}
707
708static void
709blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
710{
711    i->blockIndex = i->blockCount;
712}
713
714static void
715blockIteratorFree( struct tr_blockIterator ** inout )
716{
717    struct tr_blockIterator * it = *inout;
718
719    if( it != NULL )
720    {
721        tr_free( it->blocks );
722        tr_free( it->pieces );
723        tr_free( it );
724    }
725
726    *inout = NULL;
727}
728
729static tr_peer**
730getPeersUploadingToClient( Torrent * t,
731                           int *     setmeCount )
732{
733    int j;
734    int peerCount = 0;
735    int retCount = 0;
736    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
737    tr_peer ** ret = tr_new( tr_peer *, peerCount );
738
739    j = 0; /* this is a temporary test to make sure we walk through all the peers */
740    if( peerCount )
741    {
742        /* Get a list of peers we're downloading from.
743           Pick a different starting point each time so all peers
744           get a chance at being the first in line */
745        const int fencepost = tr_cryptoWeakRandInt( peerCount );
746        int i = fencepost;
747        do {
748            if( clientIsDownloadingFrom( peers[i] ) )
749                ret[retCount++] = peers[i];
750            i = ( i + 1 ) % peerCount;
751            ++j;
752        } while( i != fencepost );
753    }
754    assert( j == peerCount );
755    *setmeCount = retCount;
756    return ret;
757}
758
759static uint32_t
760getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
761{
762    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
763    const uint64_t blockPos = tor->blockSize * b;
764    assert( blockPos >= piecePos );
765    return (uint32_t)( blockPos - piecePos );
766}
767
768static int
769refillUpkeep( void * vmgr )
770{
771    tr_torrent * tor = NULL;
772    tr_peerMgr * mgr = vmgr;
773    time_t now;
774    managerLock( mgr );
775
776    now = time( NULL );
777    while(( tor = tr_torrentNext( mgr->session, tor ))) {
778        Torrent * t = tor->torrentPeers;
779        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
780            tordbg( t, "refill queue is past its shelf date; discarding." );
781            blockIteratorFree( &t->refillQueue );
782        }
783    }
784
785    managerUnlock( mgr );
786    return TRUE;
787}
788
789static int
790refillPulse( void * vtorrent )
791{
792    tr_block_index_t block;
793    int peerCount;
794    int webseedCount;
795    tr_peer ** peers;
796    tr_webseed ** webseeds;
797    Torrent * t = vtorrent;
798    tr_torrent * tor = t->tor;
799    tr_bool hasNext = TRUE;
800
801    if( !t->isRunning )
802        return TRUE;
803    if( tr_torrentIsSeed( t->tor ) )
804        return TRUE;
805
806    torrentLock( t );
807    tordbg( t, "Refilling Request Buffers..." );
808
809    if( t->refillQueue == NULL )
810        t->refillQueue = blockIteratorNew( t );
811
812    peers = getPeersUploadingToClient( t, &peerCount );
813    webseedCount = tr_ptrArraySize( &t->webseeds );
814    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
815                          webseedCount * sizeof( tr_webseed* ) );
816
817    while( ( webseedCount || peerCount )
818        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
819    {
820        int j;
821        tr_bool handled = FALSE;
822
823        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
824        const uint32_t offset = getBlockOffsetInPiece( tor, block );
825        const uint32_t length = tr_torBlockCountBytes( tor, block );
826
827        assert( block < tor->blockCount );
828
829        /* find a peer who can ask for this block */
830        for( j=0; !handled && j<peerCount; )
831        {
832            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
833            switch( val )
834            {
835                case TR_ADDREQ_FULL:
836                case TR_ADDREQ_CLIENT_CHOKED:
837                    peers[j] = peers[--peerCount];
838                    break;
839
840                case TR_ADDREQ_MISSING:
841                case TR_ADDREQ_DUPLICATE:
842                    ++j;
843                    break;
844
845                case TR_ADDREQ_OK:
846                    incrementPieceRequests( t, index );
847                    handled = TRUE;
848                    break;
849
850                default:
851                    assert( 0 && "unhandled value" );
852                    break;
853            }
854        }
855
856        /* maybe one of the webseeds can do it */
857        for( j=0; !handled && j<webseedCount; )
858        {
859            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
860            switch( val )
861            {
862                case TR_ADDREQ_FULL:
863                    webseeds[j] = webseeds[--webseedCount];
864                    break;
865
866                case TR_ADDREQ_OK:
867                    incrementPieceRequests( t, index );
868                    handled = TRUE;
869                    break;
870
871                default:
872                    assert( 0 && "unhandled value" );
873                    break;
874            }
875        }
876
877        if( !handled )
878            blockIteratorSkipCurrentPiece( t->refillQueue );
879    }
880
881    /* cleanup */
882    tr_free( webseeds );
883    tr_free( peers );
884
885    if( !hasNext ) {
886        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
887        blockIteratorFree( &t->refillQueue );
888    }
889
890    t->refillTimer = NULL;
891    torrentUnlock( t );
892    return FALSE;
893}
894
895static void
896broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
897{
898    size_t i;
899    size_t peerCount;
900    tr_peer ** peers;
901
902    assert( torrentIsLocked( t ) );
903
904    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
905
906    peerCount = tr_ptrArraySize( &t->peers );
907    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
908    for( i=0; i<peerCount; ++i )
909        if( peers[i]->msgs )
910            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
911}
912
913static void
914addStrike( Torrent * t,
915           tr_peer * peer )
916{
917    tordbg( t, "increasing peer %s strike count to %d",
918            tr_peerIoAddrStr( &peer->addr,
919                              peer->port ), peer->strikes + 1 );
920
921    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
922    {
923        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
924        atom->myflags |= MYFLAG_BANNED;
925        peer->doPurge = 1;
926        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
927    }
928}
929
930static void
931gotBadPiece( Torrent *        t,
932             tr_piece_index_t pieceIndex )
933{
934    tr_torrent *   tor = t->tor;
935    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
936
937    tor->corruptCur += byteCount;
938    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
939}
940
941static void
942refillSoon( Torrent * t )
943{
944    if( t->refillTimer == NULL )
945        t->refillTimer = tr_timerNew( t->manager->session,
946                                      refillPulse, t,
947                                      REFILL_PERIOD_MSEC );
948}
949
950static void
951peerSuggestedPiece( Torrent            * t UNUSED,
952                    tr_peer            * peer UNUSED,
953                    tr_piece_index_t     pieceIndex UNUSED,
954                    int                  isFastAllowed UNUSED )
955{
956#if 0
957    assert( t );
958    assert( peer );
959    assert( peer->msgs );
960
961    /* is this a valid piece? */
962    if(  pieceIndex >= t->tor->info.pieceCount )
963        return;
964
965    /* don't ask for it if we've already got it */
966    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
967        return;
968
969    /* don't ask for it if they don't have it */
970    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
971        return;
972
973    /* don't ask for it if we're choked and it's not fast */
974    if( !isFastAllowed && peer->clientIsChoked )
975        return;
976
977    /* request the blocks that we don't have in this piece */
978    {
979        tr_block_index_t block;
980        const tr_torrent * tor = t->tor;
981        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
982        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
983
984        for( block=start; block<end; ++block )
985        {
986            if( !tr_cpBlockIsComplete( tor->completion, block ) )
987            {
988                const uint32_t offset = getBlockOffsetInPiece( tor, block );
989                const uint32_t length = tr_torBlockCountBytes( tor, block );
990                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
991                incrementPieceRequests( t, pieceIndex );
992            }
993        }
994    }
995#endif
996}
997
998static void
999peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1000{
1001    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1002    Torrent * t = vt;
1003    const tr_peer_event * e = vevent;
1004
1005    torrentLock( t );
1006
1007    switch( e->eventType )
1008    {
1009        case TR_PEER_UPLOAD_ONLY:
1010            /* update our atom */
1011            if( peer ) {
1012                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1013                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1014            }
1015            break;
1016
1017        case TR_PEER_NEED_REQ:
1018            refillSoon( t );
1019            break;
1020
1021        case TR_PEER_CANCEL:
1022            decrementPieceRequests( t, e->pieceIndex );
1023            break;
1024
1025        case TR_PEER_PEER_GOT_DATA:
1026        {
1027            const time_t now = time( NULL );
1028            tr_torrent * tor = t->tor;
1029
1030            tor->activityDate = now;
1031
1032            if( e->wasPieceData )
1033                tor->uploadedCur += e->length;
1034
1035            /* update the stats */
1036            if( e->wasPieceData )
1037                tr_statsAddUploaded( tor->session, e->length );
1038
1039            /* update our atom */
1040            if( peer ) {
1041                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1042                if( e->wasPieceData )
1043                    a->piece_data_time = now;
1044            }
1045
1046            break;
1047        }
1048
1049        case TR_PEER_CLIENT_GOT_SUGGEST:
1050            if( peer )
1051                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1052            break;
1053
1054        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1055            if( peer )
1056                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1057            break;
1058
1059        case TR_PEER_CLIENT_GOT_DATA:
1060        {
1061            const time_t now = time( NULL );
1062            tr_torrent * tor = t->tor;
1063            tor->activityDate = now;
1064
1065            /* only add this to downloadedCur if we got it from a peer --
1066             * webseeds shouldn't count against our ratio.  As one tracker
1067             * admin put it, "Those pieces are downloaded directly from the
1068             * content distributor, not the peers, it is the tracker's job
1069             * to manage the swarms, not the web server and does not fit
1070             * into the jurisdiction of the tracker." */
1071            if( peer && e->wasPieceData )
1072                tor->downloadedCur += e->length;
1073
1074            /* update the stats */ 
1075            if( e->wasPieceData )
1076                tr_statsAddDownloaded( tor->session, e->length );
1077
1078            /* update our atom */
1079            if( peer ) {
1080                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1081                if( e->wasPieceData )
1082                    a->piece_data_time = now;
1083            }
1084
1085            break;
1086        }
1087
1088        case TR_PEER_PEER_PROGRESS:
1089        {
1090            if( peer )
1091            {
1092                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1093                const int peerIsSeed = e->progress >= 1.0;
1094                if( peerIsSeed ) {
1095                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1096                    atom->flags |= ADDED_F_SEED_FLAG;
1097                } else {
1098                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1099                    atom->flags &= ~ADDED_F_SEED_FLAG;
1100                }
1101            }
1102            break;
1103        }
1104
1105        case TR_PEER_CLIENT_GOT_BLOCK:
1106        {
1107            tr_torrent * tor = t->tor;
1108
1109            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1110
1111            tr_cpBlockAdd( &tor->completion, block );
1112            decrementPieceRequests( t, e->pieceIndex );
1113
1114            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1115
1116            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1117            {
1118                const tr_piece_index_t p = e->pieceIndex;
1119                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1120
1121                if( !ok )
1122                {
1123                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1124                               (unsigned long)p );
1125                }
1126
1127                tr_torrentSetHasPiece( tor, p, ok );
1128                tr_torrentSetPieceChecked( tor, p, TRUE );
1129                tr_peerMgrSetBlame( tor, p, ok );
1130
1131                if( !ok )
1132                {
1133                    gotBadPiece( t, p );
1134                }
1135                else
1136                {
1137                    int i;
1138                    int peerCount;
1139                    tr_peer ** peers;
1140                    tr_file_index_t fileIndex;
1141
1142                    peerCount = tr_ptrArraySize( &t->peers );
1143                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1144                    for( i=0; i<peerCount; ++i )
1145                        tr_peerMsgsHave( peers[i]->msgs, p );
1146
1147                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1148                    {
1149                        const tr_file * file = &tor->info.files[fileIndex];
1150                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1151                        {
1152                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1153                            tordbg( t, "closing recently-completed file \"%s\"", path );
1154                            tr_fdFileClose( path );
1155                            tr_free( path );
1156                        }
1157                    }
1158                }
1159
1160                tr_torrentRecheckCompleteness( tor );
1161            }
1162            break;
1163        }
1164
1165        case TR_PEER_ERROR:
1166            if( e->err == EINVAL )
1167            {
1168                addStrike( t, peer );
1169                peer->doPurge = 1;
1170                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1171            }
1172            else if( ( e->err == ERANGE )
1173                  || ( e->err == EMSGSIZE )
1174                  || ( e->err == ENOTCONN ) )
1175            {
1176                /* some protocol error from the peer */
1177                peer->doPurge = 1;
1178                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1179            }
1180            else /* a local error, such as an IO error */
1181            {
1182                t->tor->error = e->err;
1183                tr_strlcpy( t->tor->errorString,
1184                            tr_strerror( t->tor->error ),
1185                            sizeof( t->tor->errorString ) );
1186                tr_torrentStop( t->tor );
1187            }
1188            break;
1189
1190        default:
1191            assert( 0 );
1192    }
1193
1194    torrentUnlock( t );
1195}
1196
1197static void
1198ensureAtomExists( Torrent          * t,
1199                  const tr_address * addr,
1200                  tr_port            port,
1201                  uint8_t            flags,
1202                  uint8_t            from )
1203{
1204    if( getExistingAtom( t, addr ) == NULL )
1205    {
1206        struct peer_atom * a;
1207        a = tr_new0( struct peer_atom, 1 );
1208        a->addr = *addr;
1209        a->port = port;
1210        a->flags = flags;
1211        a->from = from;
1212        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1213        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1214    }
1215}
1216
1217static int
1218getMaxPeerCount( const tr_torrent * tor )
1219{
1220    return tor->maxConnectedPeers;
1221}
1222
1223static int
1224getPeerCount( const Torrent * t )
1225{
1226    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1227}
1228
1229/* FIXME: this is kind of a mess. */
1230static tr_bool
1231myHandshakeDoneCB( tr_handshake  * handshake,
1232                   tr_peerIo     * io,
1233                   tr_bool         isConnected,
1234                   const uint8_t * peer_id,
1235                   void          * vmanager )
1236{
1237    tr_bool            ok = isConnected;
1238    tr_bool            success = FALSE;
1239    tr_port            port;
1240    const tr_address * addr;
1241    tr_peerMgr       * manager = vmanager;
1242    Torrent          * t;
1243    tr_handshake     * ours;
1244
1245    assert( io );
1246    assert( tr_isBool( ok ) );
1247
1248    t = tr_peerIoHasTorrentHash( io )
1249        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1250        : NULL;
1251
1252    if( tr_peerIoIsIncoming ( io ) )
1253        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1254                                        handshake, handshakeCompare );
1255    else if( t )
1256        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1257                                        handshake, handshakeCompare );
1258    else
1259        ours = handshake;
1260
1261    assert( ours );
1262    assert( ours == handshake );
1263
1264    if( t )
1265        torrentLock( t );
1266
1267    addr = tr_peerIoGetAddress( io, &port );
1268
1269    if( !ok || !t || !t->isRunning )
1270    {
1271        if( t )
1272        {
1273            struct peer_atom * atom = getExistingAtom( t, addr );
1274            if( atom )
1275                ++atom->numFails;
1276        }
1277    }
1278    else /* looking good */
1279    {
1280        struct peer_atom * atom;
1281        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1282        atom = getExistingAtom( t, addr );
1283        atom->time = time( NULL );
1284        atom->piece_data_time = 0;
1285
1286        if( atom->myflags & MYFLAG_BANNED )
1287        {
1288            tordbg( t, "banned peer %s tried to reconnect",
1289                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1290        }
1291        else if( tr_peerIoIsIncoming( io )
1292               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1293
1294        {
1295        }
1296        else
1297        {
1298            tr_peer * peer = getExistingPeer( t, addr );
1299
1300            if( peer ) /* we already have this peer */
1301            {
1302            }
1303            else
1304            {
1305                peer = getPeer( t, addr );
1306                tr_free( peer->client );
1307
1308                if( !peer_id )
1309                    peer->client = NULL;
1310                else {
1311                    char client[128];
1312                    tr_clientForId( client, sizeof( client ), peer_id );
1313                    peer->client = tr_strdup( client );
1314                }
1315
1316                peer->port = port;
1317                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1318                                                                balanced by our unref in peerDestructor()  */
1319                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1320                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1321
1322                success = TRUE;
1323            }
1324        }
1325    }
1326
1327    if( t )
1328        torrentUnlock( t );
1329
1330    return success;
1331}
1332
1333void
1334tr_peerMgrAddIncoming( tr_peerMgr * manager,
1335                       tr_address * addr,
1336                       tr_port      port,
1337                       int          socket )
1338{
1339    managerLock( manager );
1340
1341    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1342    {
1343        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1344        tr_netClose( socket );
1345    }
1346    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1347    {
1348        tr_netClose( socket );
1349    }
1350    else /* we don't have a connetion to them yet... */
1351    {
1352        tr_peerIo *    io;
1353        tr_handshake * handshake;
1354
1355        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1356
1357        handshake = tr_handshakeNew( io,
1358                                     manager->session->encryptionMode,
1359                                     myHandshakeDoneCB,
1360                                     manager );
1361
1362        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1363
1364        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1365                                 handshakeCompare );
1366    }
1367
1368    managerUnlock( manager );
1369}
1370
1371static tr_bool
1372tr_isPex( const tr_pex * pex )
1373{
1374    return pex && tr_isAddress( &pex->addr );
1375}
1376
1377void
1378tr_peerMgrAddPex( tr_torrent   *  tor,
1379                  uint8_t         from,
1380                  const tr_pex *  pex )
1381{
1382    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1383    {
1384        Torrent * t = tor->torrentPeers;
1385        managerLock( t->manager );
1386
1387        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1388            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1389                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1390
1391        managerUnlock( t->manager );
1392    }
1393}
1394
1395tr_pex *
1396tr_peerMgrCompactToPex( const void *    compact,
1397                        size_t          compactLen,
1398                        const uint8_t * added_f,
1399                        size_t          added_f_len,
1400                        size_t *        pexCount )
1401{
1402    size_t          i;
1403    size_t          n = compactLen / 6;
1404    const uint8_t * walk = compact;
1405    tr_pex *        pex = tr_new0( tr_pex, n );
1406
1407    for( i = 0; i < n; ++i )
1408    {
1409        pex[i].addr.type = TR_AF_INET;
1410        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1411        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1412        if( added_f && ( n == added_f_len ) )
1413            pex[i].flags = added_f[i];
1414    }
1415
1416    *pexCount = n;
1417    return pex;
1418}
1419
1420tr_pex *
1421tr_peerMgrCompact6ToPex( const void    * compact,
1422                         size_t          compactLen,
1423                         const uint8_t * added_f,
1424                         size_t          added_f_len,
1425                         size_t        * pexCount )
1426{
1427    size_t          i;
1428    size_t          n = compactLen / 18;
1429    const uint8_t * walk = compact;
1430    tr_pex *        pex = tr_new0( tr_pex, n );
1431   
1432    for( i = 0; i < n; ++i )
1433    {
1434        pex[i].addr.type = TR_AF_INET6;
1435        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1436        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1437        if( added_f && ( n == added_f_len ) )
1438            pex[i].flags = added_f[i];
1439    }
1440   
1441    *pexCount = n;
1442    return pex;
1443}
1444
1445tr_pex *
1446tr_peerMgrArrayToPex( const void * array,
1447                      size_t       arrayLen,
1448                      size_t      * pexCount )
1449{
1450    size_t          i;
1451    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1452    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1453    const uint8_t * walk = array;
1454    tr_pex        * pex = tr_new0( tr_pex, n );
1455   
1456    for( i = 0 ; i < n ; i++ ) {
1457        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1458        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1459        pex[i].flags = 0x00;
1460        walk += sizeof( tr_address ) + 2;
1461    }
1462   
1463    *pexCount = n;
1464    return pex;
1465}
1466
1467/**
1468***
1469**/
1470
1471void
1472tr_peerMgrSetBlame( tr_torrent     * tor,
1473                    tr_piece_index_t pieceIndex,
1474                    int              success )
1475{
1476    if( !success )
1477    {
1478        int        peerCount, i;
1479        Torrent *  t = tor->torrentPeers;
1480        tr_peer ** peers;
1481
1482        assert( torrentIsLocked( t ) );
1483
1484        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1485        for( i = 0; i < peerCount; ++i )
1486        {
1487            tr_peer * peer = peers[i];
1488            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1489            {
1490                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1491                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1492                        pieceIndex, (int)peer->strikes + 1 );
1493                addStrike( t, peer );
1494            }
1495        }
1496    }
1497}
1498
1499int
1500tr_pexCompare( const void * va, const void * vb )
1501{
1502    const tr_pex * a = va;
1503    const tr_pex * b = vb;
1504    int i;
1505
1506    assert( tr_isPex( a ) );
1507    assert( tr_isPex( b ) );
1508
1509    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1510        return i;
1511
1512    if( a->port != b->port )
1513        return a->port < b->port ? -1 : 1;
1514
1515    return 0;
1516}
1517
1518static int
1519peerPrefersCrypto( const tr_peer * peer )
1520{
1521    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1522        return TRUE;
1523
1524    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1525        return FALSE;
1526
1527    return tr_peerIoIsEncrypted( peer->io );
1528}
1529
1530int
1531tr_peerMgrGetPeers( tr_torrent      * tor,
1532                    tr_pex         ** setme_pex,
1533                    uint8_t           af)
1534{
1535    int peersReturning = 0;
1536    const Torrent * t = tor->torrentPeers;
1537
1538    managerLock( t->manager );
1539
1540    {
1541        int i;
1542        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1543        const int peerCount = tr_ptrArraySize( &t->peers );
1544        /* for now, this will waste memory on torrents that have both
1545         * ipv6 and ipv4 peers */
1546        tr_pex * pex = tr_new( tr_pex, peerCount );
1547        tr_pex * walk = pex;
1548
1549        for( i=0; i<peerCount; ++i )
1550        {
1551            const tr_peer * peer = peers[i];
1552            if( peer->addr.type == af )
1553            {
1554                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1555
1556                assert( tr_isAddress( &peer->addr ) );
1557                walk->addr = peer->addr;
1558                walk->port = peer->port;
1559                walk->flags = 0;
1560                if( peerPrefersCrypto( peer ) )
1561                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1562                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1563                    walk->flags |= ADDED_F_SEED_FLAG;
1564                ++peersReturning;
1565                ++walk;
1566            }
1567        }
1568
1569        assert( ( walk - pex ) == peersReturning );
1570        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1571        *setme_pex = pex;
1572    }
1573
1574    managerUnlock( t->manager );
1575    return peersReturning;
1576}
1577
1578void
1579tr_peerMgrStartTorrent( tr_torrent * tor )
1580{
1581    Torrent * t = tor->torrentPeers;
1582
1583    managerLock( t->manager );
1584
1585    assert( t );
1586
1587    if( !t->isRunning )
1588    {
1589        t->isRunning = TRUE;
1590
1591        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1592            refillSoon( t );
1593    }
1594
1595    managerUnlock( t->manager );
1596}
1597
1598static void
1599stopTorrent( Torrent * t )
1600{
1601    assert( torrentIsLocked( t ) );
1602
1603    t->isRunning = FALSE;
1604
1605    /* disconnect the peers. */
1606    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1607    tr_ptrArrayClear( &t->peers );
1608
1609    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1610     * which removes the handshake from t->outgoingHandshakes... */
1611    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1612        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1613}
1614
1615void
1616tr_peerMgrStopTorrent( tr_torrent * tor )
1617{
1618    Torrent * t = tor->torrentPeers;
1619
1620    managerLock( t->manager );
1621
1622    stopTorrent( t );
1623
1624    managerUnlock( t->manager );
1625}
1626
1627void
1628tr_peerMgrAddTorrent( tr_peerMgr * manager,
1629                      tr_torrent * tor )
1630{
1631    managerLock( manager );
1632
1633    assert( tor );
1634    assert( tor->torrentPeers == NULL );
1635
1636    tor->torrentPeers = torrentConstructor( manager, tor );
1637
1638    managerUnlock( manager );
1639}
1640
1641void
1642tr_peerMgrRemoveTorrent( tr_torrent * tor )
1643{
1644    tr_torrentLock( tor );
1645
1646    stopTorrent( tor->torrentPeers );
1647    torrentDestructor( tor->torrentPeers );
1648
1649    tr_torrentUnlock( tor );
1650}
1651
1652void
1653tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1654                               int8_t           * tab,
1655                               unsigned int       tabCount )
1656{
1657    tr_piece_index_t   i;
1658    const Torrent *    t;
1659    float              interval;
1660    tr_bool            isSeed;
1661    int                peerCount;
1662    const tr_peer **   peers;
1663    tr_torrentLock( tor );
1664
1665    t = tor->torrentPeers;
1666    tor = t->tor;
1667    interval = tor->info.pieceCount / (float)tabCount;
1668    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1669    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1670    peerCount = tr_ptrArraySize( &t->peers );
1671
1672    memset( tab, 0, tabCount );
1673
1674    for( i = 0; tor && i < tabCount; ++i )
1675    {
1676        const int piece = i * interval;
1677
1678        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1679            tab[i] = -1;
1680        else if( peerCount ) {
1681            int j;
1682            for( j = 0; j < peerCount; ++j )
1683                if( tr_bitfieldHas( peers[j]->have, i ) )
1684                    ++tab[i];
1685        }
1686    }
1687
1688    tr_torrentUnlock( tor );
1689}
1690
1691/* Returns the pieces that are available from peers */
1692tr_bitfield*
1693tr_peerMgrGetAvailable( const tr_torrent * tor )
1694{
1695    int i;
1696    int peerCount;
1697    Torrent * t = tor->torrentPeers;
1698    const tr_peer ** peers;
1699    tr_bitfield * pieces;
1700    managerLock( t->manager );
1701
1702    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1703    peerCount = tr_ptrArraySize( &t->peers );
1704    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1705    for( i=0; i<peerCount; ++i )
1706        tr_bitfieldOr( pieces, peers[i]->have );
1707
1708    managerUnlock( t->manager );
1709    return pieces;
1710}
1711
1712void
1713tr_peerMgrTorrentStats( tr_torrent       * tor,
1714                        int              * setmePeersKnown,
1715                        int              * setmePeersConnected,
1716                        int              * setmeSeedsConnected,
1717                        int              * setmeWebseedsSendingToUs,
1718                        int              * setmePeersSendingToUs,
1719                        int              * setmePeersGettingFromUs,
1720                        int              * setmePeersFrom )
1721{
1722    int i, size;
1723    const Torrent * t = tor->torrentPeers;
1724    const tr_peer ** peers;
1725    const tr_webseed ** webseeds;
1726
1727    managerLock( t->manager );
1728
1729    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1730    size = tr_ptrArraySize( &t->peers );
1731
1732    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1733    *setmePeersConnected       = 0;
1734    *setmeSeedsConnected       = 0;
1735    *setmePeersGettingFromUs   = 0;
1736    *setmePeersSendingToUs     = 0;
1737    *setmeWebseedsSendingToUs  = 0;
1738
1739    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1740        setmePeersFrom[i] = 0;
1741
1742    for( i=0; i<size; ++i )
1743    {
1744        const tr_peer * peer = peers[i];
1745        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1746
1747        if( peer->io == NULL ) /* not connected */
1748            continue;
1749
1750        ++*setmePeersConnected;
1751
1752        ++setmePeersFrom[atom->from];
1753
1754        if( clientIsDownloadingFrom( peer ) )
1755            ++*setmePeersSendingToUs;
1756
1757        if( clientIsUploadingTo( peer ) )
1758            ++*setmePeersGettingFromUs;
1759
1760        if( atom->flags & ADDED_F_SEED_FLAG )
1761            ++*setmeSeedsConnected;
1762    }
1763
1764    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1765    size = tr_ptrArraySize( &t->webseeds );
1766    for( i=0; i<size; ++i )
1767        if( tr_webseedIsActive( webseeds[i] ) )
1768            ++*setmeWebseedsSendingToUs;
1769
1770    managerUnlock( t->manager );
1771}
1772
1773float*
1774tr_peerMgrWebSpeeds( const tr_torrent * tor )
1775{
1776    const Torrent * t = tor->torrentPeers;
1777    const tr_webseed ** webseeds;
1778    int i;
1779    int webseedCount;
1780    float * ret;
1781    uint64_t now;
1782
1783    assert( t->manager );
1784    managerLock( t->manager );
1785
1786    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1787    webseedCount = tr_ptrArraySize( &t->webseeds );
1788    assert( webseedCount == tor->info.webseedCount );
1789    ret = tr_new0( float, webseedCount );
1790    now = tr_date( );
1791
1792    for( i=0; i<webseedCount; ++i )
1793        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1794            ret[i] = -1.0;
1795
1796    managerUnlock( t->manager );
1797    return ret;
1798}
1799
1800double
1801tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1802{
1803    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1804}
1805
1806
1807struct tr_peer_stat *
1808tr_peerMgrPeerStats( const tr_torrent    * tor,
1809                     int                 * setmeCount )
1810{
1811    int i, size;
1812    const Torrent * t = tor->torrentPeers;
1813    const tr_peer ** peers;
1814    tr_peer_stat * ret;
1815    uint64_t now;
1816
1817    assert( t->manager );
1818    managerLock( t->manager );
1819
1820    size = tr_ptrArraySize( &t->peers );
1821    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1822    ret = tr_new0( tr_peer_stat, size );
1823    now = tr_date( );
1824
1825    for( i = 0; i < size; ++i )
1826    {
1827        char *                   pch;
1828        const tr_peer *          peer = peers[i];
1829        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1830        tr_peer_stat *           stat = ret + i;
1831
1832        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1833        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1834                   sizeof( stat->client ) );
1835        stat->port               = ntohs( peer->port );
1836        stat->from               = atom->from;
1837        stat->progress           = peer->progress;
1838        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1839        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1840        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1841        stat->peerIsChoked       = peer->peerIsChoked;
1842        stat->peerIsInterested   = peer->peerIsInterested;
1843        stat->clientIsChoked     = peer->clientIsChoked;
1844        stat->clientIsInterested = peer->clientIsInterested;
1845        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1846        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1847        stat->isUploadingTo      = clientIsUploadingTo( peer );
1848        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1849
1850        pch = stat->flagStr;
1851        if( t->optimistic == peer ) *pch++ = 'O';
1852        if( stat->isDownloadingFrom ) *pch++ = 'D';
1853        else if( stat->clientIsInterested ) *pch++ = 'd';
1854        if( stat->isUploadingTo ) *pch++ = 'U';
1855        else if( stat->peerIsInterested ) *pch++ = 'u';
1856        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1857        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1858        if( stat->isEncrypted ) *pch++ = 'E';
1859        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1860        if( stat->isIncoming ) *pch++ = 'I';
1861        *pch = '\0';
1862    }
1863
1864    *setmeCount = size;
1865
1866    managerUnlock( t->manager );
1867    return ret;
1868}
1869
1870/**
1871***
1872**/
1873
1874struct ChokeData
1875{
1876    tr_bool         doUnchoke;
1877    tr_bool         isInterested;
1878    tr_bool         isChoked;
1879    int             rate;
1880    tr_peer *       peer;
1881};
1882
1883static int
1884compareChoke( const void * va,
1885              const void * vb )
1886{
1887    const struct ChokeData * a = va;
1888    const struct ChokeData * b = vb;
1889
1890    if( a->rate != b->rate ) /* prefer higher overall speeds */
1891        return a->rate > b->rate ? -1 : 1;
1892
1893    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1894        return a->isChoked ? 1 : -1;
1895
1896    return 0;
1897}
1898
1899static int
1900isNew( const tr_peer * peer )
1901{
1902    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1903}
1904
1905static int
1906isSame( const tr_peer * peer )
1907{
1908    return peer && peer->client && strstr( peer->client, "Transmission" );
1909}
1910
1911/**
1912***
1913**/
1914
1915static void
1916rechokeTorrent( Torrent * t )
1917{
1918    int i, size, unchokedInterested;
1919    const int peerCount = tr_ptrArraySize( &t->peers );
1920    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1921    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1922    const tr_session * session = t->manager->session;
1923    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1924    const uint64_t now = tr_date( );
1925
1926    assert( torrentIsLocked( t ) );
1927
1928    /* sort the peers by preference and rate */
1929    for( i = 0, size = 0; i < peerCount; ++i )
1930    {
1931        tr_peer * peer = peers[i];
1932        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1933
1934        if( peer->progress >= 1.0 ) /* choke all seeds */
1935        {
1936            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1937        }
1938        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1939        {
1940            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1941        }
1942        else if( chokeAll ) /* choke everyone if we're not uploading */
1943        {
1944            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1945        }
1946        else
1947        {
1948            struct ChokeData * n = &choke[size++];
1949            n->peer         = peer;
1950            n->isInterested = peer->peerIsInterested;
1951            n->isChoked     = peer->peerIsChoked;
1952            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1953        }
1954    }
1955
1956    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1957
1958    /**
1959     * Reciprocation and number of uploads capping is managed by unchoking
1960     * the N peers which have the best upload rate and are interested.
1961     * This maximizes the client's download rate. These N peers are
1962     * referred to as downloaders, because they are interested in downloading
1963     * from the client.
1964     *
1965     * Peers which have a better upload rate (as compared to the downloaders)
1966     * but aren't interested get unchoked. If they become interested, the
1967     * downloader with the worst upload rate gets choked. If a client has
1968     * a complete file, it uses its upload rate rather than its download
1969     * rate to decide which peers to unchoke.
1970     */
1971    unchokedInterested = 0;
1972    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1973        choke[i].doUnchoke = 1;
1974        if( choke[i].isInterested )
1975            ++unchokedInterested;
1976    }
1977
1978    /* optimistic unchoke */
1979    if( i < size )
1980    {
1981        int n;
1982        struct ChokeData * c;
1983        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1984
1985        for( ; i<size; ++i )
1986        {
1987            if( choke[i].isInterested )
1988            {
1989                const tr_peer * peer = choke[i].peer;
1990                int x = 1, y;
1991                if( isNew( peer ) ) x *= 3;
1992                if( isSame( peer ) ) x *= 3;
1993                for( y=0; y<x; ++y )
1994                    tr_ptrArrayAppend( &randPool, &choke[i] );
1995            }
1996        }
1997
1998        if(( n = tr_ptrArraySize( &randPool )))
1999        {
2000            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2001            c->doUnchoke = 1;
2002            t->optimistic = c->peer;
2003        }
2004
2005        tr_ptrArrayDestruct( &randPool, NULL );
2006    }
2007
2008    for( i=0; i<size; ++i )
2009        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2010
2011    /* cleanup */
2012    tr_free( choke );
2013}
2014
2015static int
2016rechokePulse( void * vmgr )
2017{
2018    tr_torrent * tor = NULL;
2019    tr_peerMgr * mgr = vmgr;
2020    managerLock( mgr );
2021
2022    while(( tor = tr_torrentNext( mgr->session, tor )))
2023        if( tor->isRunning )
2024            rechokeTorrent( tor->torrentPeers );
2025
2026    managerUnlock( mgr );
2027    return TRUE;
2028}
2029
2030/***
2031****
2032****  Life and Death
2033****
2034***/
2035
2036typedef enum
2037{
2038    TR_CAN_KEEP,
2039    TR_CAN_CLOSE,
2040    TR_MUST_CLOSE,
2041}
2042tr_close_type_t;
2043
2044static tr_close_type_t
2045shouldPeerBeClosed( const Torrent    * t,
2046                    const tr_peer    * peer,
2047                    int                peerCount )
2048{
2049    const tr_torrent *       tor = t->tor;
2050    const time_t             now = time( NULL );
2051    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2052
2053    /* if it's marked for purging, close it */
2054    if( peer->doPurge )
2055    {
2056        tordbg( t, "purging peer %s because its doPurge flag is set",
2057                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2058        return TR_MUST_CLOSE;
2059    }
2060
2061    /* if we're seeding and the peer has everything we have,
2062     * and enough time has passed for a pex exchange, then disconnect */
2063    if( tr_torrentIsSeed( tor ) )
2064    {
2065        int peerHasEverything;
2066        if( atom->flags & ADDED_F_SEED_FLAG )
2067            peerHasEverything = TRUE;
2068        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2069            peerHasEverything = FALSE;
2070        else {
2071            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2072            tr_bitfieldDifference( tmp, peer->have );
2073            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2074            tr_bitfieldFree( tmp );
2075        }
2076
2077        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2078        {
2079            tordbg( t, "purging peer %s because we're both seeds",
2080                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2081            return TR_MUST_CLOSE;
2082        }
2083    }
2084
2085    /* disconnect if it's been too long since piece data has been transferred.
2086     * this is on a sliding scale based on number of available peers... */
2087    {
2088        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2089        /* if we have >= relaxIfFewerThan, strictness is 100%.
2090         * if we have zero connections, strictness is 0% */
2091        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2092                               ? 1.0
2093                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2094        const int lo = MIN_UPLOAD_IDLE_SECS;
2095        const int hi = MAX_UPLOAD_IDLE_SECS;
2096        const int limit = hi - ( ( hi - lo ) * strictness );
2097        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2098/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2099        if( idleTime > limit ) {
2100            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2101                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2102            return TR_CAN_CLOSE;
2103        }
2104    }
2105
2106    return TR_CAN_KEEP;
2107}
2108
2109static tr_peer **
2110getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2111{
2112    int i, peerCount, outsize;
2113    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2114    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2115
2116    assert( torrentIsLocked( t ) );
2117
2118    for( i = outsize = 0; i < peerCount; ++i )
2119        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2120            ret[outsize++] = peers[i];
2121
2122    *setmeSize = outsize;
2123    return ret;
2124}
2125
2126static int
2127compareCandidates( const void * va,
2128                   const void * vb )
2129{
2130    const struct peer_atom * a = *(const struct peer_atom**) va;
2131    const struct peer_atom * b = *(const struct peer_atom**) vb;
2132
2133    /* <Charles> Here we would probably want to try reconnecting to
2134     * peers that had most recently given us data. Lots of users have
2135     * trouble with resets due to their routers and/or ISPs. This way we
2136     * can quickly recover from an unwanted reset. So we sort
2137     * piece_data_time in descending order.
2138     */
2139
2140    if( a->piece_data_time != b->piece_data_time )
2141        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2142
2143    if( a->numFails != b->numFails )
2144        return a->numFails < b->numFails ? -1 : 1;
2145
2146    if( a->time != b->time )
2147        return a->time < b->time ? -1 : 1;
2148
2149    /* all other things being equal, prefer peers whose
2150     * information comes from a more reliable source */
2151    if( a->from != b->from )
2152        return a->from < b->from ? -1 : 1;
2153
2154    return 0;
2155}
2156
2157static int
2158getReconnectIntervalSecs( const struct peer_atom * atom )
2159{
2160    int          sec;
2161    const time_t now = time( NULL );
2162
2163    /* if we were recently connected to this peer and transferring piece
2164     * data, try to reconnect to them sooner rather that later -- we don't
2165     * want network troubles to get in the way of a good peer. */
2166    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2167        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2168
2169    /* don't allow reconnects more often than our minimum */
2170    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2171        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2172
2173    /* otherwise, the interval depends on how many times we've tried
2174     * and failed to connect to the peer */
2175    else switch( atom->numFails ) {
2176        case 0: sec = 0; break;
2177        case 1: sec = 5; break;
2178        case 2: sec = 2 * 60; break;
2179        case 3: sec = 15 * 60; break;
2180        case 4: sec = 30 * 60; break;
2181        case 5: sec = 60 * 60; break;
2182        default: sec = 120 * 60; break;
2183    }
2184
2185    return sec;
2186}
2187
2188static struct peer_atom **
2189getPeerCandidates( Torrent * t, int * setmeSize )
2190{
2191    int                 i, atomCount, retCount;
2192    struct peer_atom ** atoms;
2193    struct peer_atom ** ret;
2194    const time_t        now = time( NULL );
2195    const int           seed = tr_torrentIsSeed( t->tor );
2196
2197    assert( torrentIsLocked( t ) );
2198
2199    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2200    ret = tr_new( struct peer_atom*, atomCount );
2201    for( i = retCount = 0; i < atomCount; ++i )
2202    {
2203        int                interval;
2204        struct peer_atom * atom = atoms[i];
2205
2206        /* peer fed us too much bad data ... we only keep it around
2207         * now to weed it out in case someone sends it to us via pex */
2208        if( atom->myflags & MYFLAG_BANNED )
2209            continue;
2210
2211        /* peer was unconnectable before, so we're not going to keep trying.
2212         * this is needs a separate flag from `banned', since if they try
2213         * to connect to us later, we'll let them in */
2214        if( atom->myflags & MYFLAG_UNREACHABLE )
2215            continue;
2216
2217        /* we don't need two connections to the same peer... */
2218        if( peerIsInUse( t, &atom->addr ) )
2219            continue;
2220
2221        /* no need to connect if we're both seeds... */
2222        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2223                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2224            continue;
2225
2226        /* don't reconnect too often */
2227        interval = getReconnectIntervalSecs( atom );
2228        if( ( now - atom->time ) < interval )
2229        {
2230            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2231                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2232            continue;
2233        }
2234
2235        /* Don't connect to peers in our blocklist */
2236        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2237            continue;
2238
2239        ret[retCount++] = atom;
2240    }
2241
2242    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2243    *setmeSize = retCount;
2244    return ret;
2245}
2246
2247static void
2248closePeer( Torrent * t, tr_peer * peer )
2249{
2250    struct peer_atom * atom;
2251
2252    assert( t != NULL );
2253    assert( peer != NULL );
2254
2255    /* if we transferred piece data, then they might be good peers,
2256       so reset their `numFails' weight to zero.  otherwise we connected
2257       to them fruitlessly, so mark it as another fail */
2258    atom = getExistingAtom( t, &peer->addr );
2259    if( atom->piece_data_time )
2260        atom->numFails = 0;
2261    else
2262        ++atom->numFails;
2263
2264    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2265    removePeer( t, peer );
2266}
2267
2268static void
2269reconnectTorrent( Torrent * t )
2270{
2271    static time_t prevTime = 0;
2272    static int    newConnectionsThisSecond = 0;
2273    time_t        now;
2274
2275    now = time( NULL );
2276    if( prevTime != now )
2277    {
2278        prevTime = now;
2279        newConnectionsThisSecond = 0;
2280    }
2281
2282    if( !t->isRunning )
2283    {
2284        removeAllPeers( t );
2285    }
2286    else
2287    {
2288        int i;
2289        int canCloseCount;
2290        int mustCloseCount;
2291        int candidateCount;
2292        int maxCandidates;
2293        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2294        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2295        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2296
2297        tordbg( t, "reconnect pulse for [%s]: "
2298                   "%d must-close connections, "
2299                   "%d can-close connections, "
2300                   "%d connection candidates, "
2301                   "%d atoms, "
2302                   "max per pulse is %d",
2303                   t->tor->info.name,
2304                   mustCloseCount,
2305                   canCloseCount,
2306                   candidateCount,
2307                   tr_ptrArraySize( &t->pool ),
2308                   MAX_RECONNECTIONS_PER_PULSE );
2309
2310        /* disconnect the really bad peers */
2311        for( i=0; i<mustCloseCount; ++i )
2312            closePeer( t, mustClose[i] );
2313
2314        /* decide how many peers can we try to add in this pass */
2315        maxCandidates = candidateCount;
2316        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2317        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2318        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2319
2320        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2321        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2322            closePeer( t, canClose[i] );
2323
2324        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2325                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2326                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2327                   candidateCount,
2328                   MAX_RECONNECTIONS_PER_PULSE,
2329                   getPeerCount( t ),
2330                   getMaxPeerCount( t->tor ),
2331                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2332
2333        /* add some new ones */
2334        for( i=0; i<maxCandidates; ++i )
2335        {
2336            tr_peerMgr        * mgr = t->manager;
2337            struct peer_atom  * atom = candidates[i];
2338            tr_peerIo         * io;
2339
2340            tordbg( t, "Starting an OUTGOING connection with %s",
2341                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2342
2343            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2344
2345            if( io == NULL )
2346            {
2347                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2348                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2349                atom->myflags |= MYFLAG_UNREACHABLE;
2350            }
2351            else
2352            {
2353                tr_handshake * handshake = tr_handshakeNew( io,
2354                                                            mgr->session->encryptionMode,
2355                                                            myHandshakeDoneCB,
2356                                                            mgr );
2357
2358                assert( tr_peerIoGetTorrentHash( io ) );
2359
2360                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2361
2362                ++newConnectionsThisSecond;
2363
2364                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2365                                         handshakeCompare );
2366            }
2367
2368            atom->time = time( NULL );
2369        }
2370
2371        /* cleanup */
2372        tr_free( candidates );
2373        tr_free( mustClose );
2374        tr_free( canClose );
2375    }
2376}
2377
2378static int
2379reconnectPulse( void * vmgr )
2380{
2381    tr_torrent * tor = NULL;
2382    tr_peerMgr * mgr = vmgr;
2383    managerLock( mgr );
2384
2385    while(( tor = tr_torrentNext( mgr->session, tor )))
2386        if( tor->isRunning )
2387            reconnectTorrent( tor->torrentPeers );
2388
2389    managerUnlock( mgr );
2390    return TRUE;
2391}
2392
2393/****
2394*****
2395*****  BANDWIDTH ALLOCATION
2396*****
2397****/
2398
2399static void
2400pumpAllPeers( tr_peerMgr * mgr )
2401{
2402    tr_torrent * tor = NULL;
2403
2404    while(( tor = tr_torrentNext( mgr->session, tor )))
2405    {
2406        int j;
2407        Torrent * t = tor->torrentPeers;
2408
2409        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2410        {
2411            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2412            tr_peerMsgsPulse( peer->msgs );
2413        }
2414    }
2415}
2416
2417static int
2418bandwidthPulse( void * vmgr )
2419{
2420    tr_peerMgr * mgr = vmgr;
2421    managerLock( mgr );
2422
2423    /* FIXME: this next line probably isn't necessary... */
2424    pumpAllPeers( mgr );
2425
2426    /* allocate bandwidth to the peers */
2427    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2428    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2429
2430    managerUnlock( mgr );
2431    return TRUE;
2432}
Note: See TracBrowser for help on using the repository browser.