source: trunk/libtransmission/peer-mgr.c @ 7892

Last change on this file since 7892 was 7892, checked in by livings124, 13 years ago

have the mac ui use libT's ratio settings (attempt 1); when seed ratio is reached in libT, set the seed ratio setting to "seed forever"

  • Property svn:keywords set to Date Rev Author Id
File size: 68.7 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7892 2009-02-14 21:15:57Z livings124 $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = 500,
57
58    /* when many peers are available, keep idle ones this long */
59    MIN_UPLOAD_IDLE_SECS = ( 30 ),
60
61    /* when few peers are available, keep idle ones this long */
62    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
63
64    /* max # of peers to ask fer per torrent per reconnect pulse */
65    MAX_RECONNECTIONS_PER_PULSE = 16,
66
67    /* max number of peers to ask for per second overall.
68    * this throttle is to avoid overloading the router */
69    MAX_CONNECTIONS_PER_SECOND = 32,
70
71    /* number of bad pieces a peer is allowed to send before we ban them */
72    MAX_BAD_PIECES_PER_PEER = 5,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91enum
92{
93    UPLOAD_ONLY_UKNOWN,
94    UPLOAD_ONLY_YES,
95    UPLOAD_ONLY_NO
96};
97
98/**
99 * Peer information that should be kept even before we've connected and
100 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
101 * which ones would make good candidates for connecting to, and to watch out
102 * for banned peers.
103 *
104 * @see tr_peer
105 * @see tr_peermsgs
106 */
107struct peer_atom
108{
109    uint8_t     from;
110    uint8_t     flags;       /* these match the added_f flags */
111    uint8_t     myflags;     /* flags that aren't defined in added_f */
112    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
113    tr_port     port;
114    uint16_t    numFails;
115    tr_address  addr;
116    time_t      time;        /* when the peer's connection status last changed */
117    time_t      piece_data_time;
118};
119
120typedef struct tr_torrent_peers
121{
122    tr_bool         isRunning;
123
124    uint8_t         hash[SHA_DIGEST_LENGTH];
125    int         *   pendingRequestCount;
126    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
127    tr_ptrArray     pool; /* struct peer_atom */
128    tr_ptrArray     peers; /* tr_peer */
129    tr_ptrArray     webseeds; /* tr_webseed */
130    tr_timer *      refillTimer;
131    tr_torrent *    tor;
132    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
133
134    struct tr_peerMgr * manager;
135}
136Torrent;
137
138struct tr_peerMgr
139{
140    tr_session      * session;
141    tr_ptrArray       incomingHandshakes; /* tr_handshake */
142    tr_timer        * bandwidthTimer;
143    tr_timer        * rechokeTimer;
144    tr_timer        * reconnectTimer;
145};
146
147#define tordbg( t, ... ) \
148    do { \
149        if( tr_deepLoggingIsActive( ) ) \
150            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
151    } while( 0 )
152
153#define dbgmsg( ... ) \
154    do { \
155        if( tr_deepLoggingIsActive( ) ) \
156            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
157    } while( 0 )
158
159/**
160***
161**/
162
163static TR_INLINE void
164managerLock( const struct tr_peerMgr * manager )
165{
166    tr_globalLock( manager->session );
167}
168
169static TR_INLINE void
170managerUnlock( const struct tr_peerMgr * manager )
171{
172    tr_globalUnlock( manager->session );
173}
174
175static TR_INLINE void
176torrentLock( Torrent * torrent )
177{
178    managerLock( torrent->manager );
179}
180
181static TR_INLINE void
182torrentUnlock( Torrent * torrent )
183{
184    managerUnlock( torrent->manager );
185}
186
187static TR_INLINE int
188torrentIsLocked( const Torrent * t )
189{
190    return tr_globalIsLocked( t->manager->session );
191}
192
193/**
194***
195**/
196
197static int
198handshakeCompareToAddr( const void * va, const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a, const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray      * handshakes,
213                      const tr_address * addr )
214{
215    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
216}
217
218static int
219comparePeerAtomToAddress( const void * va, const void * vb )
220{
221    const struct peer_atom * a = va;
222
223    return tr_compareAddresses( &a->addr, vb );
224}
225
226static int
227comparePeerAtoms( const void * va, const void * vb )
228{
229    const struct peer_atom * b = vb;
230
231    return comparePeerAtomToAddress( va, &b->addr );
232}
233
234/**
235***
236**/
237
238static Torrent*
239getExistingTorrent( tr_peerMgr *    manager,
240                    const uint8_t * hash )
241{
242    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
243
244    return tor == NULL ? NULL : tor->torrentPeers;
245}
246
247static int
248peerCompare( const void * va, const void * vb )
249{
250    const tr_peer * a = va;
251    const tr_peer * b = vb;
252
253    return tr_compareAddresses( &a->addr, &b->addr );
254}
255
256static int
257peerCompareToAddr( const void * va, const void * vb )
258{
259    const tr_peer * a = va;
260
261    return tr_compareAddresses( &a->addr, vb );
262}
263
264static tr_peer*
265getExistingPeer( Torrent          * torrent,
266                 const tr_address * addr )
267{
268    assert( torrentIsLocked( torrent ) );
269    assert( addr );
270
271    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
272}
273
274static struct peer_atom*
275getExistingAtom( const Torrent    * t,
276                 const tr_address * addr )
277{
278    Torrent * tt = (Torrent*)t;
279    assert( torrentIsLocked( t ) );
280    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
281}
282
283static tr_bool
284peerIsInUse( const Torrent    * ct,
285             const tr_address * addr )
286{
287    Torrent * t = (Torrent*) ct;
288
289    assert( torrentIsLocked ( t ) );
290
291    return getExistingPeer( t, addr )
292        || getExistingHandshake( &t->outgoingHandshakes, addr )
293        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
294}
295
296static tr_peer*
297peerConstructor( const tr_address * addr )
298{
299    tr_peer * p;
300    p = tr_new0( tr_peer, 1 );
301    p->addr = *addr;
302    return p;
303}
304
305static tr_peer*
306getPeer( Torrent          * torrent,
307         const tr_address * addr )
308{
309    tr_peer * peer;
310
311    assert( torrentIsLocked( torrent ) );
312
313    peer = getExistingPeer( torrent, addr );
314
315    if( peer == NULL )
316    {
317        peer = peerConstructor( addr );
318        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
319    }
320
321    return peer;
322}
323
324static void
325peerDestructor( tr_peer * peer )
326{
327    assert( peer );
328
329    if( peer->msgs != NULL )
330    {
331        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
332        tr_peerMsgsFree( peer->msgs );
333    }
334
335    tr_peerIoClear( peer->io );
336    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
337
338    tr_bitfieldFree( peer->have );
339    tr_bitfieldFree( peer->blame );
340    tr_free( peer->client );
341
342    tr_free( peer );
343}
344
345static void
346removePeer( Torrent * t,
347            tr_peer * peer )
348{
349    tr_peer *          removed;
350    struct peer_atom * atom;
351
352    assert( torrentIsLocked( t ) );
353
354    atom = getExistingAtom( t, &peer->addr );
355    assert( atom );
356    atom->time = time( NULL );
357
358    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
359    assert( removed == peer );
360    peerDestructor( removed );
361}
362
363static void
364removeAllPeers( Torrent * t )
365{
366    while( !tr_ptrArrayEmpty( &t->peers ) )
367        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
368}
369
370static void
371torrentDestructor( void * vt )
372{
373    Torrent * t = vt;
374    uint8_t   hash[SHA_DIGEST_LENGTH];
375
376    assert( t );
377    assert( !t->isRunning );
378    assert( torrentIsLocked( t ) );
379    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
380    assert( tr_ptrArrayEmpty( &t->peers ) );
381
382    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
383
384    tr_timerFree( &t->refillTimer );
385
386    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
387    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
388    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
389    tr_ptrArrayDestruct( &t->peers, NULL );
390
391    tr_free( t->pendingRequestCount );
392    tr_free( t );
393}
394
395static void peerCallbackFunc( void * vpeer,
396                              void * vevent,
397                              void * vt );
398
399static Torrent*
400torrentConstructor( tr_peerMgr * manager,
401                    tr_torrent * tor )
402{
403    int       i;
404    Torrent * t;
405
406    t = tr_new0( Torrent, 1 );
407    t->manager = manager;
408    t->tor = tor;
409    t->pool = TR_PTR_ARRAY_INIT;
410    t->peers = TR_PTR_ARRAY_INIT;
411    t->webseeds = TR_PTR_ARRAY_INIT;
412    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
413    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
414
415    for( i = 0; i < tor->info.webseedCount; ++i )
416    {
417        tr_webseed * w =
418            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
419        tr_ptrArrayAppend( &t->webseeds, w );
420    }
421
422    return t;
423}
424
425
426static int bandwidthPulse ( void * vmgr );
427static int rechokePulse   ( void * vmgr );
428static int reconnectPulse ( void * vmgr );
429
430tr_peerMgr*
431tr_peerMgrNew( tr_session * session )
432{
433    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
434
435    m->session = session;
436    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
437    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
438    m->rechokeTimer   = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
439    m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
440
441    rechokePulse( m );
442
443    return m;
444}
445
446void
447tr_peerMgrFree( tr_peerMgr * manager )
448{
449    managerLock( manager );
450
451    tr_timerFree( &manager->reconnectTimer );
452    tr_timerFree( &manager->rechokeTimer );
453    tr_timerFree( &manager->bandwidthTimer );
454
455    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
456     * the item from manager->handshakes, so this is a little roundabout... */
457    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
458        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
459
460    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
461
462    managerUnlock( manager );
463    tr_free( manager );
464}
465
466static int
467clientIsDownloadingFrom( const tr_peer * peer )
468{
469    return peer->clientIsInterested && !peer->clientIsChoked;
470}
471
472static int
473clientIsUploadingTo( const tr_peer * peer )
474{
475    return peer->peerIsInterested && !peer->peerIsChoked;
476}
477
478/***
479****
480***/
481
482tr_bool
483tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
484                      const tr_address  * addr )
485{
486    tr_bool isSeed = FALSE;
487    const Torrent * t = tor->torrentPeers;
488    const struct peer_atom * atom = getExistingAtom( t, addr );
489
490    if( atom )
491        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
492
493    return isSeed;
494}
495
496/****
497*****
498*****  REFILL
499*****
500****/
501
502static void
503assertValidPiece( Torrent * t, tr_piece_index_t piece )
504{
505    assert( t );
506    assert( t->tor );
507    assert( piece < t->tor->info.pieceCount );
508}
509
510static int
511getPieceRequests( Torrent * t, tr_piece_index_t piece )
512{
513    assertValidPiece( t, piece );
514
515    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
516}
517
518static void
519incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
520{
521    assertValidPiece( t, piece );
522
523    if( t->pendingRequestCount == NULL )
524        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
525    t->pendingRequestCount[piece]++;
526}
527
528static void
529decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
530{
531    assertValidPiece( t, piece );
532
533    if( t->pendingRequestCount )
534        t->pendingRequestCount[piece]--;
535}
536
537struct tr_refill_piece
538{
539    tr_priority_t    priority;
540    uint32_t         piece;
541    uint32_t         peerCount;
542    int              random;
543    int              pendingRequestCount;
544    int              missingBlockCount;
545};
546
547static int
548compareRefillPiece( const void * aIn, const void * bIn )
549{
550    const struct tr_refill_piece * a = aIn;
551    const struct tr_refill_piece * b = bIn;
552
553    /* if one piece has a higher priority, it goes first */
554    if( a->priority != b->priority )
555        return a->priority > b->priority ? -1 : 1;
556
557    /* have a per-priority endgame */
558    if( a->pendingRequestCount != b->pendingRequestCount )
559        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
560
561    /* fewer missing pieces goes first */
562    if( a->missingBlockCount != b->missingBlockCount )
563        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
564
565    /* otherwise if one has fewer peers, it goes first */
566    if( a->peerCount != b->peerCount )
567        return a->peerCount < b->peerCount ? -1 : 1;
568
569    /* otherwise go with our random seed */
570    if( a->random != b->random )
571        return a->random < b->random ? -1 : 1;
572
573    return 0;
574}
575
576static tr_piece_index_t *
577getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
578{
579    const tr_torrent  * tor = t->tor;
580    const tr_info     * inf = &tor->info;
581    tr_piece_index_t    i;
582    tr_piece_index_t    poolSize = 0;
583    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
584    int                 peerCount;
585    const tr_peer    ** peers;
586
587    assert( torrentIsLocked( t ) );
588
589    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
590    peerCount = tr_ptrArraySize( &t->peers );
591
592    /* make a list of the pieces that we want but don't have */
593    for( i = 0; i < inf->pieceCount; ++i )
594        if( !tor->info.pieces[i].dnd
595                && !tr_cpPieceIsComplete( &tor->completion, i ) )
596            pool[poolSize++] = i;
597
598    /* sort the pool by which to request next */
599    if( poolSize > 1 )
600    {
601        tr_piece_index_t j;
602        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
603
604        for( j = 0; j < poolSize; ++j )
605        {
606            int k;
607            const tr_piece_index_t piece = pool[j];
608            struct tr_refill_piece * setme = p + j;
609
610            setme->piece = piece;
611            setme->priority = inf->pieces[piece].priority;
612            setme->peerCount = 0;
613            setme->random = tr_cryptoWeakRandInt( INT_MAX );
614            setme->pendingRequestCount = getPieceRequests( t, piece );
615            setme->missingBlockCount
616                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
617
618            for( k = 0; k < peerCount; ++k )
619            {
620                const tr_peer * peer = peers[k];
621                if( peer->peerIsInterested
622                        && !peer->clientIsChoked
623                        && tr_bitfieldHas( peer->have, piece ) )
624                    ++setme->peerCount;
625            }
626        }
627
628        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
629               compareRefillPiece );
630
631        for( j = 0; j < poolSize; ++j )
632            pool[j] = p[j].piece;
633
634        tr_free( p );
635    }
636
637    *pieceCount = poolSize;
638    return pool;
639}
640
641struct tr_blockIterator
642{
643    Torrent * t;
644    tr_block_index_t blockIndex, blockCount, *blocks;
645    tr_piece_index_t pieceIndex, pieceCount, *pieces;
646};
647
648static struct tr_blockIterator*
649blockIteratorNew( Torrent * t )
650{
651    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
652    i->t = t;
653    i->pieces = getPreferredPieces( t, &i->pieceCount );
654    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
655    return i;
656}
657
658static int
659blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
660{
661    int found;
662    Torrent * t = i->t;
663    tr_torrent * tor = t->tor;
664
665    while( ( i->blockIndex == i->blockCount )
666        && ( i->pieceIndex < i->pieceCount ) )
667    {
668        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
669        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
670        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
671        tr_block_index_t block;
672
673        assert( index < tor->info.pieceCount );
674
675        i->blockCount = 0;
676        i->blockIndex = 0;
677        for( block=b; block!=e; ++block )
678            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
679                i->blocks[i->blockCount++] = block;
680    }
681
682    assert( i->blockCount <= tor->blockCountInPiece );
683
684    if(( found = ( i->blockIndex < i->blockCount )))
685        *setme = i->blocks[i->blockIndex++];
686
687    return found;
688}
689
690static void
691blockIteratorFree( struct tr_blockIterator * i )
692{
693    tr_free( i->blocks );
694    tr_free( i->pieces );
695    tr_free( i );
696}
697
698static tr_peer**
699getPeersUploadingToClient( Torrent * t,
700                           int *     setmeCount )
701{
702    int j;
703    int peerCount = 0;
704    int retCount = 0;
705    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
706    tr_peer ** ret = tr_new( tr_peer *, peerCount );
707
708    j = 0; /* this is a temporary test to make sure we walk through all the peers */
709    if( peerCount )
710    {
711        /* Get a list of peers we're downloading from.
712           Pick a different starting point each time so all peers
713           get a chance at being the first in line */
714        const int fencepost = tr_cryptoWeakRandInt( peerCount );
715        int i = fencepost;
716        do {
717            if( clientIsDownloadingFrom( peers[i] ) )
718                ret[retCount++] = peers[i];
719            i = ( i + 1 ) % peerCount;
720            ++j;
721        } while( i != fencepost );
722    }
723    assert( j == peerCount );
724    *setmeCount = retCount;
725    return ret;
726}
727
728static uint32_t
729getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
730{
731    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
732    const uint64_t blockPos = tor->blockSize * b;
733    assert( blockPos >= piecePos );
734    return (uint32_t)( blockPos - piecePos );
735}
736
737static int
738refillPulse( void * vtorrent )
739{
740    tr_block_index_t block;
741    int peerCount;
742    int webseedCount;
743    tr_peer ** peers;
744    tr_webseed ** webseeds;
745    struct tr_blockIterator * blockIterator;
746    Torrent * t = vtorrent;
747    tr_torrent * tor = t->tor;
748
749    if( !t->isRunning )
750        return TRUE;
751    if( tr_torrentIsSeed( t->tor ) )
752        return TRUE;
753
754    torrentLock( t );
755    tordbg( t, "Refilling Request Buffers..." );
756
757    blockIterator = blockIteratorNew( t );
758    peers = getPeersUploadingToClient( t, &peerCount );
759    webseedCount = tr_ptrArraySize( &t->webseeds );
760    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
761                          webseedCount * sizeof( tr_webseed* ) );
762
763    while( ( webseedCount || peerCount )
764        && blockIteratorNext( blockIterator, &block ) )
765    {
766        int j;
767        int handled = FALSE;
768
769        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
770        const uint32_t offset = getBlockOffsetInPiece( tor, block );
771        const uint32_t length = tr_torBlockCountBytes( tor, block );
772
773        /* find a peer who can ask for this block */
774        for( j=0; !handled && j<peerCount; )
775        {
776            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
777            switch( val )
778            {
779                case TR_ADDREQ_FULL:
780                case TR_ADDREQ_CLIENT_CHOKED:
781                    peers[j] = peers[--peerCount];
782                    break;
783
784                case TR_ADDREQ_MISSING:
785                case TR_ADDREQ_DUPLICATE:
786                    ++j;
787                    break;
788
789                case TR_ADDREQ_OK:
790                    incrementPieceRequests( t, index );
791                    handled = TRUE;
792                    break;
793
794                default:
795                    assert( 0 && "unhandled value" );
796                    break;
797            }
798        }
799
800        /* maybe one of the webseeds can do it */
801        for( j=0; !handled && j<webseedCount; )
802        {
803            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
804            switch( val )
805            {
806                case TR_ADDREQ_FULL:
807                    webseeds[j] = webseeds[--webseedCount];
808                    break;
809
810                case TR_ADDREQ_OK:
811                    incrementPieceRequests( t, index );
812                    handled = TRUE;
813                    break;
814
815                default:
816                    assert( 0 && "unhandled value" );
817                    break;
818            }
819        }
820    }
821
822    /* cleanup */
823    blockIteratorFree( blockIterator );
824    tr_free( webseeds );
825    tr_free( peers );
826
827    t->refillTimer = NULL;
828    torrentUnlock( t );
829    return FALSE;
830}
831
832static void
833broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
834{
835    size_t i;
836    size_t peerCount;
837    tr_peer ** peers;
838
839    assert( torrentIsLocked( t ) );
840
841    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
842
843    peerCount = tr_ptrArraySize( &t->peers );
844    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
845    for( i=0; i<peerCount; ++i )
846        if( peers[i]->msgs )
847            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
848}
849
850static void
851addStrike( Torrent * t,
852           tr_peer * peer )
853{
854    tordbg( t, "increasing peer %s strike count to %d",
855            tr_peerIoAddrStr( &peer->addr,
856                              peer->port ), peer->strikes + 1 );
857
858    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
859    {
860        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
861        atom->myflags |= MYFLAG_BANNED;
862        peer->doPurge = 1;
863        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
864    }
865}
866
867static void
868gotBadPiece( Torrent *        t,
869             tr_piece_index_t pieceIndex )
870{
871    tr_torrent *   tor = t->tor;
872    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
873
874    tor->corruptCur += byteCount;
875    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
876}
877
878static void
879refillSoon( Torrent * t )
880{
881    if( t->refillTimer == NULL )
882        t->refillTimer = tr_timerNew( t->manager->session,
883                                      refillPulse, t,
884                                      REFILL_PERIOD_MSEC );
885}
886
887static void
888peerSuggestedPiece( Torrent            * t UNUSED,
889                    tr_peer            * peer UNUSED,
890                    tr_piece_index_t     pieceIndex UNUSED,
891                    int                  isFastAllowed UNUSED )
892{
893#if 0
894    assert( t );
895    assert( peer );
896    assert( peer->msgs );
897
898    /* is this a valid piece? */
899    if(  pieceIndex >= t->tor->info.pieceCount )
900        return;
901
902    /* don't ask for it if we've already got it */
903    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
904        return;
905
906    /* don't ask for it if they don't have it */
907    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
908        return;
909
910    /* don't ask for it if we're choked and it's not fast */
911    if( !isFastAllowed && peer->clientIsChoked )
912        return;
913
914    /* request the blocks that we don't have in this piece */
915    {
916        tr_block_index_t block;
917        const tr_torrent * tor = t->tor;
918        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
919        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
920
921        for( block=start; block<end; ++block )
922        {
923            if( !tr_cpBlockIsComplete( tor->completion, block ) )
924            {
925                const uint32_t offset = getBlockOffsetInPiece( tor, block );
926                const uint32_t length = tr_torBlockCountBytes( tor, block );
927                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
928                incrementPieceRequests( t, pieceIndex );
929            }
930        }
931    }
932#endif
933}
934
935static void
936fireRatioLimitHit( tr_torrent * tor )
937{
938    assert( tr_isTorrent( tor ) );
939
940    if( tor->ratio_limit_hit_func )
941        tor->ratio_limit_hit_func( tor, tor->ratio_limit_hit_func_user_data );
942}
943
944static void
945peerCallbackFunc( void * vpeer, void * vevent, void * vt )
946{
947    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
948    Torrent * t = vt;
949    const tr_peer_event * e = vevent;
950
951    torrentLock( t );
952
953    switch( e->eventType )
954    {
955        case TR_PEER_UPLOAD_ONLY:
956            /* update our atom */
957            if( peer ) {
958                struct peer_atom * a = getExistingAtom( t, &peer->addr );
959                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
960            }
961            break;
962
963        case TR_PEER_NEED_REQ:
964            refillSoon( t );
965            break;
966
967        case TR_PEER_CANCEL:
968            decrementPieceRequests( t, e->pieceIndex );
969            break;
970
971        case TR_PEER_PEER_GOT_DATA:
972        {
973            const time_t now = time( NULL );
974            tr_torrent * tor = t->tor;
975            double seedRatio;
976
977            tor->activityDate = now;
978
979            if( e->wasPieceData )
980                tor->uploadedCur += e->length;
981
982            /* update the stats */
983            if( e->wasPieceData )
984                tr_statsAddUploaded( tor->session, e->length );
985
986            /* update our atom */
987            if( peer ) {
988                struct peer_atom * a = getExistingAtom( t, &peer->addr );
989                if( e->wasPieceData )
990                    a->piece_data_time = now;
991            }
992
993            /* if we're seeding and we've reached our seed ratio limit, stop the torrent */
994            if( tr_torrentIsSeed( tor ) && tr_torrentGetSeedRatio( tor, &seedRatio ) ) {
995                double up = (double)tor->uploadedCur + (double)tor->uploadedPrev;
996                double down = (double)tor->downloadedCur + (double)tor->downloadedPrev;
997                double ratio = tr_getRatio( up, down );
998                if( ratio >= seedRatio ) {
999                    tr_torrentStop( tor );
1000                   
1001                    /* set to no ratio limit to allow easy restarting */
1002                    tr_torrentSetRatioMode( tor, TR_RATIOLIMIT_UNLIMITED );
1003                   
1004                    fireRatioLimitHit( tor );
1005                }
1006            }
1007
1008            break;
1009        }
1010
1011        case TR_PEER_CLIENT_GOT_SUGGEST:
1012            if( peer )
1013                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1014            break;
1015
1016        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1017            if( peer )
1018                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1019            break;
1020
1021        case TR_PEER_CLIENT_GOT_DATA:
1022        {
1023            const time_t now = time( NULL );
1024            tr_torrent * tor = t->tor;
1025            tor->activityDate = now;
1026
1027            /* only add this to downloadedCur if we got it from a peer --
1028             * webseeds shouldn't count against our ratio.  As one tracker
1029             * admin put it, "Those pieces are downloaded directly from the
1030             * content distributor, not the peers, it is the tracker's job
1031             * to manage the swarms, not the web server and does not fit
1032             * into the jurisdiction of the tracker." */
1033            if( peer && e->wasPieceData )
1034                tor->downloadedCur += e->length;
1035
1036            /* update the stats */ 
1037            if( e->wasPieceData )
1038                tr_statsAddDownloaded( tor->session, e->length );
1039
1040            /* update our atom */
1041            if( peer ) {
1042                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1043                if( e->wasPieceData )
1044                    a->piece_data_time = now;
1045            }
1046
1047            break;
1048        }
1049
1050        case TR_PEER_PEER_PROGRESS:
1051        {
1052            if( peer )
1053            {
1054                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1055                const int peerIsSeed = e->progress >= 1.0;
1056                if( peerIsSeed ) {
1057                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1058                    atom->flags |= ADDED_F_SEED_FLAG;
1059                } else {
1060                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1061                    atom->flags &= ~ADDED_F_SEED_FLAG;
1062                }
1063            }
1064            break;
1065        }
1066
1067        case TR_PEER_CLIENT_GOT_BLOCK:
1068        {
1069            tr_torrent * tor = t->tor;
1070
1071            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1072
1073            tr_cpBlockAdd( &tor->completion, block );
1074            decrementPieceRequests( t, e->pieceIndex );
1075
1076            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1077
1078            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1079            {
1080                const tr_piece_index_t p = e->pieceIndex;
1081                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1082
1083                if( !ok )
1084                {
1085                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1086                               (unsigned long)p );
1087                }
1088
1089                tr_torrentSetHasPiece( tor, p, ok );
1090                tr_torrentSetPieceChecked( tor, p, TRUE );
1091                tr_peerMgrSetBlame( tor, p, ok );
1092
1093                if( !ok )
1094                {
1095                    gotBadPiece( t, p );
1096                }
1097                else
1098                {
1099                    int i;
1100                    int peerCount;
1101                    tr_peer ** peers;
1102                    tr_file_index_t fileIndex;
1103
1104                    peerCount = tr_ptrArraySize( &t->peers );
1105                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1106                    for( i=0; i<peerCount; ++i )
1107                        tr_peerMsgsHave( peers[i]->msgs, p );
1108
1109                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1110                    {
1111                        const tr_file * file = &tor->info.files[fileIndex];
1112                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1113                        {
1114                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1115                            tordbg( t, "closing recently-completed file \"%s\"", path );
1116                            tr_fdFileClose( path );
1117                            tr_free( path );
1118                        }
1119                    }
1120                }
1121
1122                tr_torrentRecheckCompleteness( tor );
1123            }
1124            break;
1125        }
1126
1127        case TR_PEER_ERROR:
1128            if( e->err == EINVAL )
1129            {
1130                addStrike( t, peer );
1131                peer->doPurge = 1;
1132                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1133            }
1134            else if( ( e->err == ERANGE )
1135                  || ( e->err == EMSGSIZE )
1136                  || ( e->err == ENOTCONN ) )
1137            {
1138                /* some protocol error from the peer */
1139                peer->doPurge = 1;
1140                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1141            }
1142            else /* a local error, such as an IO error */
1143            {
1144                t->tor->error = e->err;
1145                tr_strlcpy( t->tor->errorString,
1146                            tr_strerror( t->tor->error ),
1147                            sizeof( t->tor->errorString ) );
1148                tr_torrentStop( t->tor );
1149            }
1150            break;
1151
1152        default:
1153            assert( 0 );
1154    }
1155
1156    torrentUnlock( t );
1157}
1158
1159static void
1160ensureAtomExists( Torrent          * t,
1161                  const tr_address * addr,
1162                  tr_port            port,
1163                  uint8_t            flags,
1164                  uint8_t            from )
1165{
1166    if( getExistingAtom( t, addr ) == NULL )
1167    {
1168        struct peer_atom * a;
1169        a = tr_new0( struct peer_atom, 1 );
1170        a->addr = *addr;
1171        a->port = port;
1172        a->flags = flags;
1173        a->from = from;
1174        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1175        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1176    }
1177}
1178
1179static int
1180getMaxPeerCount( const tr_torrent * tor )
1181{
1182    return tor->maxConnectedPeers;
1183}
1184
1185static int
1186getPeerCount( const Torrent * t )
1187{
1188    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1189}
1190
1191/* FIXME: this is kind of a mess. */
1192static tr_bool
1193myHandshakeDoneCB( tr_handshake  * handshake,
1194                   tr_peerIo     * io,
1195                   tr_bool         isConnected,
1196                   const uint8_t * peer_id,
1197                   void          * vmanager )
1198{
1199    tr_bool            ok = isConnected;
1200    tr_bool            success = FALSE;
1201    tr_port            port;
1202    const tr_address * addr;
1203    tr_peerMgr       * manager = vmanager;
1204    Torrent          * t;
1205    tr_handshake     * ours;
1206
1207    assert( io );
1208    assert( tr_isBool( ok ) );
1209
1210    t = tr_peerIoHasTorrentHash( io )
1211        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1212        : NULL;
1213
1214    if( tr_peerIoIsIncoming ( io ) )
1215        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1216                                        handshake, handshakeCompare );
1217    else if( t )
1218        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1219                                        handshake, handshakeCompare );
1220    else
1221        ours = handshake;
1222
1223    assert( ours );
1224    assert( ours == handshake );
1225
1226    if( t )
1227        torrentLock( t );
1228
1229    addr = tr_peerIoGetAddress( io, &port );
1230
1231    if( !ok || !t || !t->isRunning )
1232    {
1233        if( t )
1234        {
1235            struct peer_atom * atom = getExistingAtom( t, addr );
1236            if( atom )
1237                ++atom->numFails;
1238        }
1239    }
1240    else /* looking good */
1241    {
1242        struct peer_atom * atom;
1243        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1244        atom = getExistingAtom( t, addr );
1245        atom->time = time( NULL );
1246        atom->piece_data_time = 0;
1247
1248        if( atom->myflags & MYFLAG_BANNED )
1249        {
1250            tordbg( t, "banned peer %s tried to reconnect",
1251                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1252        }
1253        else if( tr_peerIoIsIncoming( io )
1254               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1255
1256        {
1257        }
1258        else
1259        {
1260            tr_peer * peer = getExistingPeer( t, addr );
1261
1262            if( peer ) /* we already have this peer */
1263            {
1264            }
1265            else
1266            {
1267                peer = getPeer( t, addr );
1268                tr_free( peer->client );
1269
1270                if( !peer_id )
1271                    peer->client = NULL;
1272                else {
1273                    char client[128];
1274                    tr_clientForId( client, sizeof( client ), peer_id );
1275                    peer->client = tr_strdup( client );
1276                }
1277
1278                peer->port = port;
1279                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1280                                                                balanced by our unref in peerDestructor()  */
1281                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1282                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1283
1284                success = TRUE;
1285            }
1286        }
1287    }
1288
1289    if( t )
1290        torrentUnlock( t );
1291
1292    return success;
1293}
1294
1295void
1296tr_peerMgrAddIncoming( tr_peerMgr * manager,
1297                       tr_address * addr,
1298                       tr_port      port,
1299                       int          socket )
1300{
1301    managerLock( manager );
1302
1303    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1304    {
1305        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1306        tr_netClose( socket );
1307    }
1308    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1309    {
1310        tr_netClose( socket );
1311    }
1312    else /* we don't have a connetion to them yet... */
1313    {
1314        tr_peerIo *    io;
1315        tr_handshake * handshake;
1316
1317        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1318
1319        handshake = tr_handshakeNew( io,
1320                                     manager->session->encryptionMode,
1321                                     myHandshakeDoneCB,
1322                                     manager );
1323
1324        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1325
1326        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1327                                 handshakeCompare );
1328    }
1329
1330    managerUnlock( manager );
1331}
1332
1333static tr_bool
1334tr_isPex( const tr_pex * pex )
1335{
1336    return pex && tr_isAddress( &pex->addr );
1337}
1338
1339void
1340tr_peerMgrAddPex( tr_torrent   *  tor,
1341                  uint8_t         from,
1342                  const tr_pex *  pex )
1343{
1344    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1345    {
1346        Torrent * t = tor->torrentPeers;
1347        managerLock( t->manager );
1348
1349        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1350            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1351                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1352
1353        managerUnlock( t->manager );
1354    }
1355}
1356
1357tr_pex *
1358tr_peerMgrCompactToPex( const void *    compact,
1359                        size_t          compactLen,
1360                        const uint8_t * added_f,
1361                        size_t          added_f_len,
1362                        size_t *        pexCount )
1363{
1364    size_t          i;
1365    size_t          n = compactLen / 6;
1366    const uint8_t * walk = compact;
1367    tr_pex *        pex = tr_new0( tr_pex, n );
1368
1369    for( i = 0; i < n; ++i )
1370    {
1371        pex[i].addr.type = TR_AF_INET;
1372        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1373        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1374        if( added_f && ( n == added_f_len ) )
1375            pex[i].flags = added_f[i];
1376    }
1377
1378    *pexCount = n;
1379    return pex;
1380}
1381
1382tr_pex *
1383tr_peerMgrCompact6ToPex( const void    * compact,
1384                         size_t          compactLen,
1385                         const uint8_t * added_f,
1386                         size_t          added_f_len,
1387                         size_t        * pexCount )
1388{
1389    size_t          i;
1390    size_t          n = compactLen / 18;
1391    const uint8_t * walk = compact;
1392    tr_pex *        pex = tr_new0( tr_pex, n );
1393   
1394    for( i = 0; i < n; ++i )
1395    {
1396        pex[i].addr.type = TR_AF_INET6;
1397        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1398        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1399        if( added_f && ( n == added_f_len ) )
1400            pex[i].flags = added_f[i];
1401    }
1402   
1403    *pexCount = n;
1404    return pex;
1405}
1406
1407tr_pex *
1408tr_peerMgrArrayToPex( const void * array,
1409                      size_t       arrayLen,
1410                      size_t      * pexCount )
1411{
1412    size_t          i;
1413    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1414    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1415    const uint8_t * walk = array;
1416    tr_pex        * pex = tr_new0( tr_pex, n );
1417   
1418    for( i = 0 ; i < n ; i++ ) {
1419        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1420        tr_suspectAddress( &pex[i].addr, "tracker"  );
1421        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1422        pex[i].flags = 0x00;
1423        walk += sizeof( tr_address ) + 2;
1424    }
1425   
1426    *pexCount = n;
1427    return pex;
1428}
1429
1430/**
1431***
1432**/
1433
1434void
1435tr_peerMgrSetBlame( tr_torrent     * tor,
1436                    tr_piece_index_t pieceIndex,
1437                    int              success )
1438{
1439    if( !success )
1440    {
1441        int        peerCount, i;
1442        Torrent *  t = tor->torrentPeers;
1443        tr_peer ** peers;
1444
1445        assert( torrentIsLocked( t ) );
1446
1447        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1448        for( i = 0; i < peerCount; ++i )
1449        {
1450            tr_peer * peer = peers[i];
1451            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1452            {
1453                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1454                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1455                        pieceIndex, (int)peer->strikes + 1 );
1456                addStrike( t, peer );
1457            }
1458        }
1459    }
1460}
1461
1462int
1463tr_pexCompare( const void * va, const void * vb )
1464{
1465    const tr_pex * a = va;
1466    const tr_pex * b = vb;
1467    int i;
1468
1469    assert( tr_isPex( a ) );
1470    assert( tr_isPex( b ) );
1471
1472    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1473        return i;
1474
1475    if( a->port != b->port )
1476        return a->port < b->port ? -1 : 1;
1477
1478    return 0;
1479}
1480
1481static int
1482peerPrefersCrypto( const tr_peer * peer )
1483{
1484    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1485        return TRUE;
1486
1487    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1488        return FALSE;
1489
1490    return tr_peerIoIsEncrypted( peer->io );
1491}
1492
1493int
1494tr_peerMgrGetPeers( tr_torrent      * tor,
1495                    tr_pex         ** setme_pex,
1496                    uint8_t           af)
1497{
1498    int peersReturning = 0;
1499    const Torrent * t = tor->torrentPeers;
1500
1501    managerLock( t->manager );
1502
1503    {
1504        int i;
1505        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1506        const int peerCount = tr_ptrArraySize( &t->peers );
1507        /* for now, this will waste memory on torrents that have both
1508         * ipv6 and ipv4 peers */
1509        tr_pex * pex = tr_new( tr_pex, peerCount );
1510        tr_pex * walk = pex;
1511
1512        for( i=0; i<peerCount; ++i )
1513        {
1514            const tr_peer * peer = peers[i];
1515            if( peer->addr.type == af )
1516            {
1517                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1518
1519                assert( tr_isAddress( &peer->addr ) );
1520                walk->addr = peer->addr;
1521                walk->port = peer->port;
1522                walk->flags = 0;
1523                if( peerPrefersCrypto( peer ) )
1524                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1525                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1526                    walk->flags |= ADDED_F_SEED_FLAG;
1527                ++peersReturning;
1528                ++walk;
1529            }
1530        }
1531
1532        assert( ( walk - pex ) == peersReturning );
1533        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1534        *setme_pex = pex;
1535    }
1536
1537    managerUnlock( t->manager );
1538    return peersReturning;
1539}
1540
1541void
1542tr_peerMgrStartTorrent( tr_torrent * tor )
1543{
1544    Torrent * t = tor->torrentPeers;
1545
1546    managerLock( t->manager );
1547
1548    assert( t );
1549
1550    if( !t->isRunning )
1551    {
1552        t->isRunning = TRUE;
1553
1554        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1555            refillSoon( t );
1556    }
1557
1558    managerUnlock( t->manager );
1559}
1560
1561static void
1562stopTorrent( Torrent * t )
1563{
1564    assert( torrentIsLocked( t ) );
1565
1566    t->isRunning = FALSE;
1567
1568    /* disconnect the peers. */
1569    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1570    tr_ptrArrayClear( &t->peers );
1571
1572    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1573     * which removes the handshake from t->outgoingHandshakes... */
1574    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1575        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1576}
1577
1578void
1579tr_peerMgrStopTorrent( tr_torrent * tor )
1580{
1581    Torrent * t = tor->torrentPeers;
1582
1583    managerLock( t->manager );
1584
1585    stopTorrent( t );
1586
1587    managerUnlock( t->manager );
1588}
1589
1590void
1591tr_peerMgrAddTorrent( tr_peerMgr * manager,
1592                      tr_torrent * tor )
1593{
1594    managerLock( manager );
1595
1596    assert( tor );
1597    assert( tor->torrentPeers == NULL );
1598
1599    tor->torrentPeers = torrentConstructor( manager, tor );
1600
1601    managerUnlock( manager );
1602}
1603
1604void
1605tr_peerMgrRemoveTorrent( tr_torrent * tor )
1606{
1607    tr_torrentLock( tor );
1608
1609    stopTorrent( tor->torrentPeers );
1610    torrentDestructor( tor->torrentPeers );
1611
1612    tr_torrentUnlock( tor );
1613}
1614
1615void
1616tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1617                               int8_t           * tab,
1618                               unsigned int       tabCount )
1619{
1620    tr_piece_index_t   i;
1621    const Torrent *    t;
1622    float              interval;
1623    tr_bool            isSeed;
1624    int                peerCount;
1625    const tr_peer **   peers;
1626    tr_torrentLock( tor );
1627
1628    t = tor->torrentPeers;
1629    tor = t->tor;
1630    interval = tor->info.pieceCount / (float)tabCount;
1631    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1632    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1633    peerCount = tr_ptrArraySize( &t->peers );
1634
1635    memset( tab, 0, tabCount );
1636
1637    for( i = 0; tor && i < tabCount; ++i )
1638    {
1639        const int piece = i * interval;
1640
1641        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1642            tab[i] = -1;
1643        else if( peerCount ) {
1644            int j;
1645            for( j = 0; j < peerCount; ++j )
1646                if( tr_bitfieldHas( peers[j]->have, i ) )
1647                    ++tab[i];
1648        }
1649    }
1650
1651    tr_torrentUnlock( tor );
1652}
1653
1654/* Returns the pieces that are available from peers */
1655tr_bitfield*
1656tr_peerMgrGetAvailable( const tr_torrent * tor )
1657{
1658    int i;
1659    int peerCount;
1660    Torrent * t = tor->torrentPeers;
1661    const tr_peer ** peers;
1662    tr_bitfield * pieces;
1663    managerLock( t->manager );
1664
1665    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1666    peerCount = tr_ptrArraySize( &t->peers );
1667    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1668    for( i=0; i<peerCount; ++i )
1669        tr_bitfieldOr( pieces, peers[i]->have );
1670
1671    managerUnlock( t->manager );
1672    return pieces;
1673}
1674
1675void
1676tr_peerMgrTorrentStats( tr_torrent       * tor,
1677                        int              * setmePeersKnown,
1678                        int              * setmePeersConnected,
1679                        int              * setmeSeedsConnected,
1680                        int              * setmeWebseedsSendingToUs,
1681                        int              * setmePeersSendingToUs,
1682                        int              * setmePeersGettingFromUs,
1683                        int              * setmePeersFrom )
1684{
1685    int i, size;
1686    const Torrent * t = tor->torrentPeers;
1687    const tr_peer ** peers;
1688    const tr_webseed ** webseeds;
1689
1690    managerLock( t->manager );
1691
1692    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1693    size = tr_ptrArraySize( &t->peers );
1694
1695    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1696    *setmePeersConnected       = 0;
1697    *setmeSeedsConnected       = 0;
1698    *setmePeersGettingFromUs   = 0;
1699    *setmePeersSendingToUs     = 0;
1700    *setmeWebseedsSendingToUs  = 0;
1701
1702    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1703        setmePeersFrom[i] = 0;
1704
1705    for( i=0; i<size; ++i )
1706    {
1707        const tr_peer * peer = peers[i];
1708        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1709
1710        if( peer->io == NULL ) /* not connected */
1711            continue;
1712
1713        ++*setmePeersConnected;
1714
1715        ++setmePeersFrom[atom->from];
1716
1717        if( clientIsDownloadingFrom( peer ) )
1718            ++*setmePeersSendingToUs;
1719
1720        if( clientIsUploadingTo( peer ) )
1721            ++*setmePeersGettingFromUs;
1722
1723        if( atom->flags & ADDED_F_SEED_FLAG )
1724            ++*setmeSeedsConnected;
1725    }
1726
1727    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1728    size = tr_ptrArraySize( &t->webseeds );
1729    for( i=0; i<size; ++i )
1730        if( tr_webseedIsActive( webseeds[i] ) )
1731            ++*setmeWebseedsSendingToUs;
1732
1733    managerUnlock( t->manager );
1734}
1735
1736float*
1737tr_peerMgrWebSpeeds( const tr_torrent * tor )
1738{
1739    const Torrent * t = tor->torrentPeers;
1740    const tr_webseed ** webseeds;
1741    int i;
1742    int webseedCount;
1743    float * ret;
1744    uint64_t now;
1745
1746    assert( t->manager );
1747    managerLock( t->manager );
1748
1749    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1750    webseedCount = tr_ptrArraySize( &t->webseeds );
1751    assert( webseedCount == tor->info.webseedCount );
1752    ret = tr_new0( float, webseedCount );
1753    now = tr_date( );
1754
1755    for( i=0; i<webseedCount; ++i )
1756        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1757            ret[i] = -1.0;
1758
1759    managerUnlock( t->manager );
1760    return ret;
1761}
1762
1763double
1764tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1765{
1766    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1767}
1768
1769
1770struct tr_peer_stat *
1771tr_peerMgrPeerStats( const tr_torrent    * tor,
1772                     int                 * setmeCount )
1773{
1774    int i, size;
1775    const Torrent * t = tor->torrentPeers;
1776    const tr_peer ** peers;
1777    tr_peer_stat * ret;
1778    uint64_t now;
1779
1780    assert( t->manager );
1781    managerLock( t->manager );
1782
1783    size = tr_ptrArraySize( &t->peers );
1784    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1785    ret = tr_new0( tr_peer_stat, size );
1786    now = tr_date( );
1787
1788    for( i = 0; i < size; ++i )
1789    {
1790        char *                   pch;
1791        const tr_peer *          peer = peers[i];
1792        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1793        tr_peer_stat *           stat = ret + i;
1794        tr_address               norm_addr;
1795
1796        norm_addr = peer->addr;
1797        tr_normalizeV4Mapped( &norm_addr );
1798        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1799        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1800                   sizeof( stat->client ) );
1801        stat->port               = ntohs( peer->port );
1802        stat->from               = atom->from;
1803        stat->progress           = peer->progress;
1804        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1805        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1806        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1807        stat->peerIsChoked       = peer->peerIsChoked;
1808        stat->peerIsInterested   = peer->peerIsInterested;
1809        stat->clientIsChoked     = peer->clientIsChoked;
1810        stat->clientIsInterested = peer->clientIsInterested;
1811        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1812        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1813        stat->isUploadingTo      = clientIsUploadingTo( peer );
1814        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1815
1816        pch = stat->flagStr;
1817        if( t->optimistic == peer ) *pch++ = 'O';
1818        if( stat->isDownloadingFrom ) *pch++ = 'D';
1819        else if( stat->clientIsInterested ) *pch++ = 'd';
1820        if( stat->isUploadingTo ) *pch++ = 'U';
1821        else if( stat->peerIsInterested ) *pch++ = 'u';
1822        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1823        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1824        if( stat->isEncrypted ) *pch++ = 'E';
1825        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1826        if( stat->isIncoming ) *pch++ = 'I';
1827        *pch = '\0';
1828    }
1829
1830    *setmeCount = size;
1831
1832    managerUnlock( t->manager );
1833    return ret;
1834}
1835
1836/**
1837***
1838**/
1839
1840struct ChokeData
1841{
1842    tr_bool         doUnchoke;
1843    tr_bool         isInterested;
1844    tr_bool         isChoked;
1845    int             rate;
1846    tr_peer *       peer;
1847};
1848
1849static int
1850compareChoke( const void * va,
1851              const void * vb )
1852{
1853    const struct ChokeData * a = va;
1854    const struct ChokeData * b = vb;
1855
1856    if( a->rate != b->rate ) /* prefer higher overall speeds */
1857        return a->rate > b->rate ? -1 : 1;
1858
1859    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1860        return a->isChoked ? 1 : -1;
1861
1862    return 0;
1863}
1864
1865static int
1866isNew( const tr_peer * peer )
1867{
1868    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1869}
1870
1871static int
1872isSame( const tr_peer * peer )
1873{
1874    return peer && peer->client && strstr( peer->client, "Transmission" );
1875}
1876
1877/**
1878***
1879**/
1880
1881static void
1882rechokeTorrent( Torrent * t )
1883{
1884    int i, size, unchokedInterested;
1885    const int peerCount = tr_ptrArraySize( &t->peers );
1886    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1887    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1888    const tr_session * session = t->manager->session;
1889    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1890    const uint64_t now = tr_date( );
1891
1892    assert( torrentIsLocked( t ) );
1893
1894    /* sort the peers by preference and rate */
1895    for( i = 0, size = 0; i < peerCount; ++i )
1896    {
1897        tr_peer * peer = peers[i];
1898        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1899
1900        if( peer->progress >= 1.0 ) /* choke all seeds */
1901        {
1902            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1903        }
1904        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1905        {
1906            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1907        }
1908        else if( chokeAll ) /* choke everyone if we're not uploading */
1909        {
1910            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1911        }
1912        else
1913        {
1914            struct ChokeData * n = &choke[size++];
1915            n->peer         = peer;
1916            n->isInterested = peer->peerIsInterested;
1917            n->isChoked     = peer->peerIsChoked;
1918            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1919        }
1920    }
1921
1922    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1923
1924    /**
1925     * Reciprocation and number of uploads capping is managed by unchoking
1926     * the N peers which have the best upload rate and are interested.
1927     * This maximizes the client's download rate. These N peers are
1928     * referred to as downloaders, because they are interested in downloading
1929     * from the client.
1930     *
1931     * Peers which have a better upload rate (as compared to the downloaders)
1932     * but aren't interested get unchoked. If they become interested, the
1933     * downloader with the worst upload rate gets choked. If a client has
1934     * a complete file, it uses its upload rate rather than its download
1935     * rate to decide which peers to unchoke.
1936     */
1937    unchokedInterested = 0;
1938    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1939        choke[i].doUnchoke = 1;
1940        if( choke[i].isInterested )
1941            ++unchokedInterested;
1942    }
1943
1944    /* optimistic unchoke */
1945    if( i < size )
1946    {
1947        int n;
1948        struct ChokeData * c;
1949        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1950
1951        for( ; i<size; ++i )
1952        {
1953            if( choke[i].isInterested )
1954            {
1955                const tr_peer * peer = choke[i].peer;
1956                int x = 1, y;
1957                if( isNew( peer ) ) x *= 3;
1958                if( isSame( peer ) ) x *= 3;
1959                for( y=0; y<x; ++y )
1960                    tr_ptrArrayAppend( &randPool, &choke[i] );
1961            }
1962        }
1963
1964        if(( n = tr_ptrArraySize( &randPool )))
1965        {
1966            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
1967            c->doUnchoke = 1;
1968            t->optimistic = c->peer;
1969        }
1970
1971        tr_ptrArrayDestruct( &randPool, NULL );
1972    }
1973
1974    for( i=0; i<size; ++i )
1975        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1976
1977    /* cleanup */
1978    tr_free( choke );
1979}
1980
1981static int
1982rechokePulse( void * vmgr )
1983{
1984    tr_torrent * tor = NULL;
1985    tr_peerMgr * mgr = vmgr;
1986    managerLock( mgr );
1987
1988    while(( tor = tr_torrentNext( mgr->session, tor )))
1989        if( tor->isRunning )
1990            rechokeTorrent( tor->torrentPeers );
1991
1992    managerUnlock( mgr );
1993    return TRUE;
1994}
1995
1996/***
1997****
1998****  Life and Death
1999****
2000***/
2001
2002typedef enum
2003{
2004    TR_CAN_KEEP,
2005    TR_CAN_CLOSE,
2006    TR_MUST_CLOSE,
2007}
2008tr_close_type_t;
2009
2010static tr_close_type_t
2011shouldPeerBeClosed( const Torrent    * t,
2012                    const tr_peer    * peer,
2013                    int                peerCount )
2014{
2015    const tr_torrent *       tor = t->tor;
2016    const time_t             now = time( NULL );
2017    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2018
2019    /* if it's marked for purging, close it */
2020    if( peer->doPurge )
2021    {
2022        tordbg( t, "purging peer %s because its doPurge flag is set",
2023                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2024        return TR_MUST_CLOSE;
2025    }
2026
2027    /* if we're seeding and the peer has everything we have,
2028     * and enough time has passed for a pex exchange, then disconnect */
2029    if( tr_torrentIsSeed( tor ) )
2030    {
2031        int peerHasEverything;
2032        if( atom->flags & ADDED_F_SEED_FLAG )
2033            peerHasEverything = TRUE;
2034        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2035            peerHasEverything = FALSE;
2036        else {
2037            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2038            tr_bitfieldDifference( tmp, peer->have );
2039            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2040            tr_bitfieldFree( tmp );
2041        }
2042
2043        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2044        {
2045            tordbg( t, "purging peer %s because we're both seeds",
2046                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2047            return TR_MUST_CLOSE;
2048        }
2049    }
2050
2051    /* disconnect if it's been too long since piece data has been transferred.
2052     * this is on a sliding scale based on number of available peers... */
2053    {
2054        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2055        /* if we have >= relaxIfFewerThan, strictness is 100%.
2056         * if we have zero connections, strictness is 0% */
2057        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2058                               ? 1.0
2059                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2060        const int lo = MIN_UPLOAD_IDLE_SECS;
2061        const int hi = MAX_UPLOAD_IDLE_SECS;
2062        const int limit = hi - ( ( hi - lo ) * strictness );
2063        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2064/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2065        if( idleTime > limit ) {
2066            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2067                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2068            return TR_CAN_CLOSE;
2069        }
2070    }
2071
2072    return TR_CAN_KEEP;
2073}
2074
2075static tr_peer **
2076getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2077{
2078    int i, peerCount, outsize;
2079    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2080    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2081
2082    assert( torrentIsLocked( t ) );
2083
2084    for( i = outsize = 0; i < peerCount; ++i )
2085        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2086            ret[outsize++] = peers[i];
2087
2088    *setmeSize = outsize;
2089    return ret;
2090}
2091
2092static int
2093compareCandidates( const void * va,
2094                   const void * vb )
2095{
2096    const struct peer_atom * a = *(const struct peer_atom**) va;
2097    const struct peer_atom * b = *(const struct peer_atom**) vb;
2098
2099    /* <Charles> Here we would probably want to try reconnecting to
2100     * peers that had most recently given us data. Lots of users have
2101     * trouble with resets due to their routers and/or ISPs. This way we
2102     * can quickly recover from an unwanted reset. So we sort
2103     * piece_data_time in descending order.
2104     */
2105
2106    if( a->piece_data_time != b->piece_data_time )
2107        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2108
2109    if( a->numFails != b->numFails )
2110        return a->numFails < b->numFails ? -1 : 1;
2111
2112    if( a->time != b->time )
2113        return a->time < b->time ? -1 : 1;
2114
2115    /* all other things being equal, prefer peers whose
2116     * information comes from a more reliable source */
2117    if( a->from != b->from )
2118        return a->from < b->from ? -1 : 1;
2119
2120    return 0;
2121}
2122
2123static int
2124getReconnectIntervalSecs( const struct peer_atom * atom )
2125{
2126    int          sec;
2127    const time_t now = time( NULL );
2128
2129    /* if we were recently connected to this peer and transferring piece
2130     * data, try to reconnect to them sooner rather that later -- we don't
2131     * want network troubles to get in the way of a good peer. */
2132    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2133        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2134
2135    /* don't allow reconnects more often than our minimum */
2136    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2137        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2138
2139    /* otherwise, the interval depends on how many times we've tried
2140     * and failed to connect to the peer */
2141    else switch( atom->numFails ) {
2142        case 0: sec = 0; break;
2143        case 1: sec = 5; break;
2144        case 2: sec = 2 * 60; break;
2145        case 3: sec = 15 * 60; break;
2146        case 4: sec = 30 * 60; break;
2147        case 5: sec = 60 * 60; break;
2148        default: sec = 120 * 60; break;
2149    }
2150
2151    return sec;
2152}
2153
2154static struct peer_atom **
2155getPeerCandidates( Torrent * t, int * setmeSize )
2156{
2157    int                 i, atomCount, retCount;
2158    struct peer_atom ** atoms;
2159    struct peer_atom ** ret;
2160    const time_t        now = time( NULL );
2161    const int           seed = tr_torrentIsSeed( t->tor );
2162
2163    assert( torrentIsLocked( t ) );
2164
2165    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2166    ret = tr_new( struct peer_atom*, atomCount );
2167    for( i = retCount = 0; i < atomCount; ++i )
2168    {
2169        int                interval;
2170        struct peer_atom * atom = atoms[i];
2171
2172        /* peer fed us too much bad data ... we only keep it around
2173         * now to weed it out in case someone sends it to us via pex */
2174        if( atom->myflags & MYFLAG_BANNED )
2175            continue;
2176
2177        /* peer was unconnectable before, so we're not going to keep trying.
2178         * this is needs a separate flag from `banned', since if they try
2179         * to connect to us later, we'll let them in */
2180        if( atom->myflags & MYFLAG_UNREACHABLE )
2181            continue;
2182
2183        /* we don't need two connections to the same peer... */
2184        if( peerIsInUse( t, &atom->addr ) )
2185            continue;
2186
2187        /* no need to connect if we're both seeds... */
2188        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2189                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2190            continue;
2191
2192        /* don't reconnect too often */
2193        interval = getReconnectIntervalSecs( atom );
2194        if( ( now - atom->time ) < interval )
2195        {
2196            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2197                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2198            continue;
2199        }
2200
2201        /* Don't connect to peers in our blocklist */
2202        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2203            continue;
2204
2205        ret[retCount++] = atom;
2206    }
2207
2208    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2209    *setmeSize = retCount;
2210    return ret;
2211}
2212
2213static void
2214closePeer( Torrent * t, tr_peer * peer )
2215{
2216    struct peer_atom * atom;
2217
2218    assert( t != NULL );
2219    assert( peer != NULL );
2220
2221    /* if we transferred piece data, then they might be good peers,
2222       so reset their `numFails' weight to zero.  otherwise we connected
2223       to them fruitlessly, so mark it as another fail */
2224    atom = getExistingAtom( t, &peer->addr );
2225    if( atom->piece_data_time )
2226        atom->numFails = 0;
2227    else
2228        ++atom->numFails;
2229
2230    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2231    removePeer( t, peer );
2232}
2233
2234static void
2235reconnectTorrent( Torrent * t )
2236{
2237    static time_t prevTime = 0;
2238    static int    newConnectionsThisSecond = 0;
2239    time_t        now;
2240
2241    now = time( NULL );
2242    if( prevTime != now )
2243    {
2244        prevTime = now;
2245        newConnectionsThisSecond = 0;
2246    }
2247
2248    if( !t->isRunning )
2249    {
2250        removeAllPeers( t );
2251    }
2252    else
2253    {
2254        int i;
2255        int canCloseCount;
2256        int mustCloseCount;
2257        int candidateCount;
2258        int maxCandidates;
2259        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2260        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2261        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2262
2263        tordbg( t, "reconnect pulse for [%s]: "
2264                   "%d must-close connections, "
2265                   "%d can-close connections, "
2266                   "%d connection candidates, "
2267                   "%d atoms, "
2268                   "max per pulse is %d",
2269                   t->tor->info.name,
2270                   mustCloseCount,
2271                   canCloseCount,
2272                   candidateCount,
2273                   tr_ptrArraySize( &t->pool ),
2274                   MAX_RECONNECTIONS_PER_PULSE );
2275
2276        /* disconnect the really bad peers */
2277        for( i=0; i<mustCloseCount; ++i )
2278            closePeer( t, mustClose[i] );
2279
2280        /* decide how many peers can we try to add in this pass */
2281        maxCandidates = candidateCount;
2282        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2283        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2284        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2285
2286        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2287        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2288            closePeer( t, canClose[i] );
2289
2290        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2291                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2292                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2293                   candidateCount,
2294                   MAX_RECONNECTIONS_PER_PULSE,
2295                   getPeerCount( t ),
2296                   getMaxPeerCount( t->tor ),
2297                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2298
2299        /* add some new ones */
2300        for( i=0; i<maxCandidates; ++i )
2301        {
2302            tr_peerMgr        * mgr = t->manager;
2303            struct peer_atom  * atom = candidates[i];
2304            tr_peerIo         * io;
2305
2306            tordbg( t, "Starting an OUTGOING connection with %s",
2307                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2308
2309            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2310
2311            if( io == NULL )
2312            {
2313                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2314                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2315                atom->myflags |= MYFLAG_UNREACHABLE;
2316            }
2317            else
2318            {
2319                tr_handshake * handshake = tr_handshakeNew( io,
2320                                                            mgr->session->encryptionMode,
2321                                                            myHandshakeDoneCB,
2322                                                            mgr );
2323
2324                assert( tr_peerIoGetTorrentHash( io ) );
2325
2326                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2327
2328                ++newConnectionsThisSecond;
2329
2330                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2331                                         handshakeCompare );
2332            }
2333
2334            atom->time = time( NULL );
2335        }
2336
2337        /* cleanup */
2338        tr_free( candidates );
2339        tr_free( mustClose );
2340        tr_free( canClose );
2341    }
2342}
2343
2344static int
2345reconnectPulse( void * vmgr )
2346{
2347    tr_torrent * tor = NULL;
2348    tr_peerMgr * mgr = vmgr;
2349    managerLock( mgr );
2350
2351    while(( tor = tr_torrentNext( mgr->session, tor )))
2352        if( tor->isRunning )
2353            reconnectTorrent( tor->torrentPeers );
2354
2355    managerUnlock( mgr );
2356    return TRUE;
2357}
2358
2359/****
2360*****
2361*****  BANDWIDTH ALLOCATION
2362*****
2363****/
2364
2365static void
2366pumpAllPeers( tr_peerMgr * mgr )
2367{
2368    tr_torrent * tor = NULL;
2369
2370    while(( tor = tr_torrentNext( mgr->session, tor )))
2371    {
2372        int j;
2373        Torrent * t = tor->torrentPeers;
2374
2375        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2376        {
2377            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2378            tr_peerMsgsPulse( peer->msgs );
2379        }
2380    }
2381}
2382
2383static int
2384bandwidthPulse( void * vmgr )
2385{
2386    tr_peerMgr * mgr = vmgr;
2387    managerLock( mgr );
2388
2389    /* FIXME: this next line probably isn't necessary... */
2390    pumpAllPeers( mgr );
2391
2392    /* allocate bandwidth to the peers */
2393    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2394    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2395
2396    managerUnlock( mgr );
2397    return TRUE;
2398}
Note: See TracBrowser for help on using the repository browser.