source: trunk/libtransmission/peer-mgr.c @ 7874

Last change on this file since 7874 was 7874, checked in by charles, 13 years ago

(trunk libT) partial revert of r7825: back out the refillPulse() changes

  • Property svn:keywords set to Date Rev Author Id
File size: 67.7 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7874 2009-02-11 16:34:35Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = 500,
57
58    /* when many peers are available, keep idle ones this long */
59    MIN_UPLOAD_IDLE_SECS = ( 30 ),
60
61    /* when few peers are available, keep idle ones this long */
62    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
63
64    /* max # of peers to ask fer per torrent per reconnect pulse */
65    MAX_RECONNECTIONS_PER_PULSE = 16,
66
67    /* max number of peers to ask for per second overall.
68    * this throttle is to avoid overloading the router */
69    MAX_CONNECTIONS_PER_SECOND = 32,
70
71    /* number of bad pieces a peer is allowed to send before we ban them */
72    MAX_BAD_PIECES_PER_PEER = 5,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91enum
92{
93    UPLOAD_ONLY_UKNOWN,
94    UPLOAD_ONLY_YES,
95    UPLOAD_ONLY_NO
96};
97
98/**
99 * Peer information that should be kept even before we've connected and
100 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
101 * which ones would make good candidates for connecting to, and to watch out
102 * for banned peers.
103 *
104 * @see tr_peer
105 * @see tr_peermsgs
106 */
107struct peer_atom
108{
109    uint8_t     from;
110    uint8_t     flags;       /* these match the added_f flags */
111    uint8_t     myflags;     /* flags that aren't defined in added_f */
112    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
113    tr_port     port;
114    uint16_t    numFails;
115    tr_address  addr;
116    time_t      time;        /* when the peer's connection status last changed */
117    time_t      piece_data_time;
118};
119
120typedef struct tr_torrent_peers
121{
122    tr_bool         isRunning;
123
124    uint8_t         hash[SHA_DIGEST_LENGTH];
125    int         *   pendingRequestCount;
126    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
127    tr_ptrArray     pool; /* struct peer_atom */
128    tr_ptrArray     peers; /* tr_peer */
129    tr_ptrArray     webseeds; /* tr_webseed */
130    tr_timer *      refillTimer;
131    tr_torrent *    tor;
132    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
133
134    struct tr_peerMgr * manager;
135}
136Torrent;
137
138struct tr_peerMgr
139{
140    tr_session      * session;
141    tr_ptrArray       incomingHandshakes; /* tr_handshake */
142    tr_timer        * bandwidthTimer;
143    tr_timer        * rechokeTimer;
144    tr_timer        * reconnectTimer;
145};
146
147#define tordbg( t, ... ) \
148    do { \
149        if( tr_deepLoggingIsActive( ) ) \
150            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
151    } while( 0 )
152
153#define dbgmsg( ... ) \
154    do { \
155        if( tr_deepLoggingIsActive( ) ) \
156            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
157    } while( 0 )
158
159/**
160***
161**/
162
163static TR_INLINE void
164managerLock( const struct tr_peerMgr * manager )
165{
166    tr_globalLock( manager->session );
167}
168
169static TR_INLINE void
170managerUnlock( const struct tr_peerMgr * manager )
171{
172    tr_globalUnlock( manager->session );
173}
174
175static TR_INLINE void
176torrentLock( Torrent * torrent )
177{
178    managerLock( torrent->manager );
179}
180
181static TR_INLINE void
182torrentUnlock( Torrent * torrent )
183{
184    managerUnlock( torrent->manager );
185}
186
187static TR_INLINE int
188torrentIsLocked( const Torrent * t )
189{
190    return tr_globalIsLocked( t->manager->session );
191}
192
193/**
194***
195**/
196
197static int
198handshakeCompareToAddr( const void * va, const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a, const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray      * handshakes,
213                      const tr_address * addr )
214{
215    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
216}
217
218static int
219comparePeerAtomToAddress( const void * va, const void * vb )
220{
221    const struct peer_atom * a = va;
222
223    return tr_compareAddresses( &a->addr, vb );
224}
225
226static int
227comparePeerAtoms( const void * va, const void * vb )
228{
229    const struct peer_atom * b = vb;
230
231    return comparePeerAtomToAddress( va, &b->addr );
232}
233
234/**
235***
236**/
237
238static Torrent*
239getExistingTorrent( tr_peerMgr *    manager,
240                    const uint8_t * hash )
241{
242    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
243
244    return tor == NULL ? NULL : tor->torrentPeers;
245}
246
247static int
248peerCompare( const void * va, const void * vb )
249{
250    const tr_peer * a = va;
251    const tr_peer * b = vb;
252
253    return tr_compareAddresses( &a->addr, &b->addr );
254}
255
256static int
257peerCompareToAddr( const void * va, const void * vb )
258{
259    const tr_peer * a = va;
260
261    return tr_compareAddresses( &a->addr, vb );
262}
263
264static tr_peer*
265getExistingPeer( Torrent          * torrent,
266                 const tr_address * addr )
267{
268    assert( torrentIsLocked( torrent ) );
269    assert( addr );
270
271    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
272}
273
274static struct peer_atom*
275getExistingAtom( const Torrent    * t,
276                 const tr_address * addr )
277{
278    Torrent * tt = (Torrent*)t;
279    assert( torrentIsLocked( t ) );
280    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
281}
282
283static tr_bool
284peerIsInUse( const Torrent    * ct,
285             const tr_address * addr )
286{
287    Torrent * t = (Torrent*) ct;
288
289    assert( torrentIsLocked ( t ) );
290
291    return getExistingPeer( t, addr )
292        || getExistingHandshake( &t->outgoingHandshakes, addr )
293        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
294}
295
296static tr_peer*
297peerConstructor( const tr_address * addr )
298{
299    tr_peer * p;
300    p = tr_new0( tr_peer, 1 );
301    p->addr = *addr;
302    return p;
303}
304
305static tr_peer*
306getPeer( Torrent          * torrent,
307         const tr_address * addr )
308{
309    tr_peer * peer;
310
311    assert( torrentIsLocked( torrent ) );
312
313    peer = getExistingPeer( torrent, addr );
314
315    if( peer == NULL )
316    {
317        peer = peerConstructor( addr );
318        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
319    }
320
321    return peer;
322}
323
324static void
325peerDestructor( tr_peer * peer )
326{
327    assert( peer );
328
329    if( peer->msgs != NULL )
330    {
331        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
332        tr_peerMsgsFree( peer->msgs );
333    }
334
335    tr_peerIoClear( peer->io );
336    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
337
338    tr_bitfieldFree( peer->have );
339    tr_bitfieldFree( peer->blame );
340    tr_free( peer->client );
341
342    tr_free( peer );
343}
344
345static void
346removePeer( Torrent * t,
347            tr_peer * peer )
348{
349    tr_peer *          removed;
350    struct peer_atom * atom;
351
352    assert( torrentIsLocked( t ) );
353
354    atom = getExistingAtom( t, &peer->addr );
355    assert( atom );
356    atom->time = time( NULL );
357
358    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
359    assert( removed == peer );
360    peerDestructor( removed );
361}
362
363static void
364removeAllPeers( Torrent * t )
365{
366    while( !tr_ptrArrayEmpty( &t->peers ) )
367        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
368}
369
370static void
371torrentDestructor( void * vt )
372{
373    Torrent * t = vt;
374    uint8_t   hash[SHA_DIGEST_LENGTH];
375
376    assert( t );
377    assert( !t->isRunning );
378    assert( torrentIsLocked( t ) );
379    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
380    assert( tr_ptrArrayEmpty( &t->peers ) );
381
382    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
383
384    tr_timerFree( &t->refillTimer );
385
386    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
387    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
388    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
389    tr_ptrArrayDestruct( &t->peers, NULL );
390
391    tr_free( t->pendingRequestCount );
392    tr_free( t );
393}
394
395static void peerCallbackFunc( void * vpeer,
396                              void * vevent,
397                              void * vt );
398
399static Torrent*
400torrentConstructor( tr_peerMgr * manager,
401                    tr_torrent * tor )
402{
403    int       i;
404    Torrent * t;
405
406    t = tr_new0( Torrent, 1 );
407    t->manager = manager;
408    t->tor = tor;
409    t->pool = TR_PTR_ARRAY_INIT;
410    t->peers = TR_PTR_ARRAY_INIT;
411    t->webseeds = TR_PTR_ARRAY_INIT;
412    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
413    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
414
415    for( i = 0; i < tor->info.webseedCount; ++i )
416    {
417        tr_webseed * w =
418            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
419        tr_ptrArrayAppend( &t->webseeds, w );
420    }
421
422    return t;
423}
424
425
426static int bandwidthPulse ( void * vmgr );
427static int rechokePulse   ( void * vmgr );
428static int reconnectPulse ( void * vmgr );
429
430tr_peerMgr*
431tr_peerMgrNew( tr_session * session )
432{
433    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
434
435    m->session = session;
436    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
437    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
438    m->rechokeTimer   = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
439    m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
440
441    rechokePulse( m );
442
443    return m;
444}
445
446void
447tr_peerMgrFree( tr_peerMgr * manager )
448{
449    managerLock( manager );
450
451    tr_timerFree( &manager->reconnectTimer );
452    tr_timerFree( &manager->rechokeTimer );
453    tr_timerFree( &manager->bandwidthTimer );
454
455    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
456     * the item from manager->handshakes, so this is a little roundabout... */
457    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
458        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
459
460    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
461
462    managerUnlock( manager );
463    tr_free( manager );
464}
465
466static int
467clientIsDownloadingFrom( const tr_peer * peer )
468{
469    return peer->clientIsInterested && !peer->clientIsChoked;
470}
471
472static int
473clientIsUploadingTo( const tr_peer * peer )
474{
475    return peer->peerIsInterested && !peer->peerIsChoked;
476}
477
478/***
479****
480***/
481
482tr_bool
483tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
484                      const tr_address  * addr )
485{
486    tr_bool isSeed = FALSE;
487    const Torrent * t = tor->torrentPeers;
488    const struct peer_atom * atom = getExistingAtom( t, addr );
489
490    if( atom )
491        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
492
493    return isSeed;
494}
495
496/****
497*****
498*****  REFILL
499*****
500****/
501
502static void
503assertValidPiece( Torrent * t, tr_piece_index_t piece )
504{
505    assert( t );
506    assert( t->tor );
507    assert( piece < t->tor->info.pieceCount );
508}
509
510static int
511getPieceRequests( Torrent * t, tr_piece_index_t piece )
512{
513    assertValidPiece( t, piece );
514
515    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
516}
517
518static void
519incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
520{
521    assertValidPiece( t, piece );
522
523    if( t->pendingRequestCount == NULL )
524        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
525    t->pendingRequestCount[piece]++;
526}
527
528static void
529decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
530{
531    assertValidPiece( t, piece );
532
533    if( t->pendingRequestCount )
534        t->pendingRequestCount[piece]--;
535}
536
537struct tr_refill_piece
538{
539    tr_priority_t    priority;
540    uint32_t         piece;
541    uint32_t         peerCount;
542    int              random;
543    int              pendingRequestCount;
544    int              missingBlockCount;
545};
546
547static int
548compareRefillPiece( const void * aIn, const void * bIn )
549{
550    const struct tr_refill_piece * a = aIn;
551    const struct tr_refill_piece * b = bIn;
552
553    /* if one piece has a higher priority, it goes first */
554    if( a->priority != b->priority )
555        return a->priority > b->priority ? -1 : 1;
556
557    /* have a per-priority endgame */
558    if( a->pendingRequestCount != b->pendingRequestCount )
559        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
560
561    /* fewer missing pieces goes first */
562    if( a->missingBlockCount != b->missingBlockCount )
563        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
564
565    /* otherwise if one has fewer peers, it goes first */
566    if( a->peerCount != b->peerCount )
567        return a->peerCount < b->peerCount ? -1 : 1;
568
569    /* otherwise go with our random seed */
570    if( a->random != b->random )
571        return a->random < b->random ? -1 : 1;
572
573    return 0;
574}
575
576static tr_piece_index_t *
577getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
578{
579    const tr_torrent  * tor = t->tor;
580    const tr_info     * inf = &tor->info;
581    tr_piece_index_t    i;
582    tr_piece_index_t    poolSize = 0;
583    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
584    int                 peerCount;
585    const tr_peer    ** peers;
586
587    assert( torrentIsLocked( t ) );
588
589    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
590    peerCount = tr_ptrArraySize( &t->peers );
591
592    /* make a list of the pieces that we want but don't have */
593    for( i = 0; i < inf->pieceCount; ++i )
594        if( !tor->info.pieces[i].dnd
595                && !tr_cpPieceIsComplete( &tor->completion, i ) )
596            pool[poolSize++] = i;
597
598    /* sort the pool by which to request next */
599    if( poolSize > 1 )
600    {
601        tr_piece_index_t j;
602        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
603
604        for( j = 0; j < poolSize; ++j )
605        {
606            int k;
607            const tr_piece_index_t piece = pool[j];
608            struct tr_refill_piece * setme = p + j;
609
610            setme->piece = piece;
611            setme->priority = inf->pieces[piece].priority;
612            setme->peerCount = 0;
613            setme->random = tr_cryptoWeakRandInt( INT_MAX );
614            setme->pendingRequestCount = getPieceRequests( t, piece );
615            setme->missingBlockCount
616                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
617
618            for( k = 0; k < peerCount; ++k )
619            {
620                const tr_peer * peer = peers[k];
621                if( peer->peerIsInterested
622                        && !peer->clientIsChoked
623                        && tr_bitfieldHas( peer->have, piece ) )
624                    ++setme->peerCount;
625            }
626        }
627
628        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
629               compareRefillPiece );
630
631        for( j = 0; j < poolSize; ++j )
632            pool[j] = p[j].piece;
633
634        tr_free( p );
635    }
636
637    *pieceCount = poolSize;
638    return pool;
639}
640
641struct tr_blockIterator
642{
643    Torrent * t;
644    tr_block_index_t blockIndex, blockCount, *blocks;
645    tr_piece_index_t pieceIndex, pieceCount, *pieces;
646};
647
648static struct tr_blockIterator*
649blockIteratorNew( Torrent * t )
650{
651    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
652    i->t = t;
653    i->pieces = getPreferredPieces( t, &i->pieceCount );
654    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
655    return i;
656}
657
658static int
659blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
660{
661    int found;
662    Torrent * t = i->t;
663    tr_torrent * tor = t->tor;
664
665    while( ( i->blockIndex == i->blockCount )
666        && ( i->pieceIndex < i->pieceCount ) )
667    {
668        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
669        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
670        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
671        tr_block_index_t block;
672
673        assert( index < tor->info.pieceCount );
674
675        i->blockCount = 0;
676        i->blockIndex = 0;
677        for( block=b; block!=e; ++block )
678            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
679                i->blocks[i->blockCount++] = block;
680    }
681
682    assert( i->blockCount <= tor->blockCountInPiece );
683
684    if(( found = ( i->blockIndex < i->blockCount )))
685        *setme = i->blocks[i->blockIndex++];
686
687    return found;
688}
689
690static void
691blockIteratorFree( struct tr_blockIterator * i )
692{
693    tr_free( i->blocks );
694    tr_free( i->pieces );
695    tr_free( i );
696}
697
698static tr_peer**
699getPeersUploadingToClient( Torrent * t,
700                           int *     setmeCount )
701{
702    int j;
703    int peerCount = 0;
704    int retCount = 0;
705    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
706    tr_peer ** ret = tr_new( tr_peer *, peerCount );
707
708    j = 0; /* this is a temporary test to make sure we walk through all the peers */
709    if( peerCount )
710    {
711        /* Get a list of peers we're downloading from.
712           Pick a different starting point each time so all peers
713           get a chance at being the first in line */
714        const int fencepost = tr_cryptoWeakRandInt( peerCount );
715        int i = fencepost;
716        do {
717            if( clientIsDownloadingFrom( peers[i] ) )
718                ret[retCount++] = peers[i];
719            i = ( i + 1 ) % peerCount;
720            ++j;
721        } while( i != fencepost );
722    }
723    assert( j == peerCount );
724    *setmeCount = retCount;
725    return ret;
726}
727
728static uint32_t
729getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
730{
731    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
732    const uint64_t blockPos = tor->blockSize * b;
733    assert( blockPos >= piecePos );
734    return (uint32_t)( blockPos - piecePos );
735}
736
737static int
738refillPulse( void * vtorrent )
739{
740    tr_block_index_t block;
741    int peerCount;
742    int webseedCount;
743    tr_peer ** peers;
744    tr_webseed ** webseeds;
745    struct tr_blockIterator * blockIterator;
746    Torrent * t = vtorrent;
747    tr_torrent * tor = t->tor;
748
749    if( !t->isRunning )
750        return TRUE;
751    if( tr_torrentIsSeed( t->tor ) )
752        return TRUE;
753
754    torrentLock( t );
755    tordbg( t, "Refilling Request Buffers..." );
756
757    blockIterator = blockIteratorNew( t );
758    peers = getPeersUploadingToClient( t, &peerCount );
759    webseedCount = tr_ptrArraySize( &t->webseeds );
760    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
761                          webseedCount * sizeof( tr_webseed* ) );
762
763    while( ( webseedCount || peerCount )
764        && blockIteratorNext( blockIterator, &block ) )
765    {
766        int j;
767        int handled = FALSE;
768
769        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
770        const uint32_t offset = getBlockOffsetInPiece( tor, block );
771        const uint32_t length = tr_torBlockCountBytes( tor, block );
772
773        /* find a peer who can ask for this block */
774        for( j=0; !handled && j<peerCount; )
775        {
776            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
777            switch( val )
778            {
779                case TR_ADDREQ_FULL:
780                case TR_ADDREQ_CLIENT_CHOKED:
781                    peers[j] = peers[--peerCount];
782                    break;
783
784                case TR_ADDREQ_MISSING:
785                case TR_ADDREQ_DUPLICATE:
786                    ++j;
787                    break;
788
789                case TR_ADDREQ_OK:
790                    incrementPieceRequests( t, index );
791                    handled = TRUE;
792                    break;
793
794                default:
795                    assert( 0 && "unhandled value" );
796                    break;
797            }
798        }
799
800        /* maybe one of the webseeds can do it */
801        for( j=0; !handled && j<webseedCount; )
802        {
803            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
804            switch( val )
805            {
806                case TR_ADDREQ_FULL:
807                    webseeds[j] = webseeds[--webseedCount];
808                    break;
809
810                case TR_ADDREQ_OK:
811                    incrementPieceRequests( t, index );
812                    handled = TRUE;
813                    break;
814
815                default:
816                    assert( 0 && "unhandled value" );
817                    break;
818            }
819        }
820    }
821
822    /* cleanup */
823    blockIteratorFree( blockIterator );
824    tr_free( webseeds );
825    tr_free( peers );
826
827    t->refillTimer = NULL;
828    torrentUnlock( t );
829    return FALSE;
830}
831
832static void
833broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
834{
835    size_t i;
836    size_t peerCount;
837    tr_peer ** peers;
838
839    assert( torrentIsLocked( t ) );
840
841    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
842
843    peerCount = tr_ptrArraySize( &t->peers );
844    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
845    for( i=0; i<peerCount; ++i )
846        if( peers[i]->msgs )
847            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
848}
849
850static void
851addStrike( Torrent * t,
852           tr_peer * peer )
853{
854    tordbg( t, "increasing peer %s strike count to %d",
855            tr_peerIoAddrStr( &peer->addr,
856                              peer->port ), peer->strikes + 1 );
857
858    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
859    {
860        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
861        atom->myflags |= MYFLAG_BANNED;
862        peer->doPurge = 1;
863        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
864    }
865}
866
867static void
868gotBadPiece( Torrent *        t,
869             tr_piece_index_t pieceIndex )
870{
871    tr_torrent *   tor = t->tor;
872    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
873
874    tor->corruptCur += byteCount;
875    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
876}
877
878static void
879refillSoon( Torrent * t )
880{
881    if( t->refillTimer == NULL )
882        t->refillTimer = tr_timerNew( t->manager->session,
883                                      refillPulse, t,
884                                      REFILL_PERIOD_MSEC );
885}
886
887static void
888peerSuggestedPiece( Torrent            * t UNUSED,
889                    tr_peer            * peer UNUSED,
890                    tr_piece_index_t     pieceIndex UNUSED,
891                    int                  isFastAllowed UNUSED )
892{
893#if 0
894    assert( t );
895    assert( peer );
896    assert( peer->msgs );
897
898    /* is this a valid piece? */
899    if(  pieceIndex >= t->tor->info.pieceCount )
900        return;
901
902    /* don't ask for it if we've already got it */
903    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
904        return;
905
906    /* don't ask for it if they don't have it */
907    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
908        return;
909
910    /* don't ask for it if we're choked and it's not fast */
911    if( !isFastAllowed && peer->clientIsChoked )
912        return;
913
914    /* request the blocks that we don't have in this piece */
915    {
916        tr_block_index_t block;
917        const tr_torrent * tor = t->tor;
918        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
919        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
920
921        for( block=start; block<end; ++block )
922        {
923            if( !tr_cpBlockIsComplete( tor->completion, block ) )
924            {
925                const uint32_t offset = getBlockOffsetInPiece( tor, block );
926                const uint32_t length = tr_torBlockCountBytes( tor, block );
927                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
928                incrementPieceRequests( t, pieceIndex );
929            }
930        }
931    }
932#endif
933}
934
935static void
936peerCallbackFunc( void * vpeer, void * vevent, void * vt )
937{
938    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
939    Torrent * t = vt;
940    const tr_peer_event * e = vevent;
941
942    torrentLock( t );
943
944    switch( e->eventType )
945    {
946        case TR_PEER_UPLOAD_ONLY:
947            /* update our atom */
948            if( peer ) {
949                struct peer_atom * a = getExistingAtom( t, &peer->addr );
950                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
951            }
952            break;
953
954        case TR_PEER_NEED_REQ:
955            refillSoon( t );
956            break;
957
958        case TR_PEER_CANCEL:
959            decrementPieceRequests( t, e->pieceIndex );
960            break;
961
962        case TR_PEER_PEER_GOT_DATA:
963        {
964            const time_t now = time( NULL );
965            tr_torrent * tor = t->tor;
966
967            tor->activityDate = now;
968
969            if( e->wasPieceData )
970                tor->uploadedCur += e->length;
971
972            /* update the stats */
973            if( e->wasPieceData )
974                tr_statsAddUploaded( tor->session, e->length );
975
976            /* update our atom */
977            if( peer ) {
978                struct peer_atom * a = getExistingAtom( t, &peer->addr );
979                if( e->wasPieceData )
980                    a->piece_data_time = now;
981            }
982
983            break;
984        }
985
986        case TR_PEER_CLIENT_GOT_SUGGEST:
987            if( peer )
988                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
989            break;
990
991        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
992            if( peer )
993                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
994            break;
995
996        case TR_PEER_CLIENT_GOT_DATA:
997        {
998            const time_t now = time( NULL );
999            tr_torrent * tor = t->tor;
1000            tor->activityDate = now;
1001
1002            /* only add this to downloadedCur if we got it from a peer --
1003             * webseeds shouldn't count against our ratio.  As one tracker
1004             * admin put it, "Those pieces are downloaded directly from the
1005             * content distributor, not the peers, it is the tracker's job
1006             * to manage the swarms, not the web server and does not fit
1007             * into the jurisdiction of the tracker." */
1008            if( peer && e->wasPieceData )
1009                tor->downloadedCur += e->length;
1010
1011            /* update the stats */ 
1012            if( e->wasPieceData )
1013                tr_statsAddDownloaded( tor->session, e->length );
1014
1015            /* update our atom */
1016            if( peer ) {
1017                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1018                if( e->wasPieceData )
1019                    a->piece_data_time = now;
1020            }
1021
1022            break;
1023        }
1024
1025        case TR_PEER_PEER_PROGRESS:
1026        {
1027            if( peer )
1028            {
1029                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1030                const int peerIsSeed = e->progress >= 1.0;
1031                if( peerIsSeed ) {
1032                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1033                    atom->flags |= ADDED_F_SEED_FLAG;
1034                } else {
1035                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1036                    atom->flags &= ~ADDED_F_SEED_FLAG;
1037                }
1038            }
1039            break;
1040        }
1041
1042        case TR_PEER_CLIENT_GOT_BLOCK:
1043        {
1044            tr_torrent * tor = t->tor;
1045
1046            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1047
1048            tr_cpBlockAdd( &tor->completion, block );
1049            decrementPieceRequests( t, e->pieceIndex );
1050
1051            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1052
1053            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1054            {
1055                const tr_piece_index_t p = e->pieceIndex;
1056                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1057
1058                if( !ok )
1059                {
1060                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1061                               (unsigned long)p );
1062                }
1063
1064                tr_torrentSetHasPiece( tor, p, ok );
1065                tr_torrentSetPieceChecked( tor, p, TRUE );
1066                tr_peerMgrSetBlame( tor, p, ok );
1067
1068                if( !ok )
1069                {
1070                    gotBadPiece( t, p );
1071                }
1072                else
1073                {
1074                    int i;
1075                    int peerCount;
1076                    tr_peer ** peers;
1077                    tr_file_index_t fileIndex;
1078
1079                    peerCount = tr_ptrArraySize( &t->peers );
1080                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1081                    for( i=0; i<peerCount; ++i )
1082                        tr_peerMsgsHave( peers[i]->msgs, p );
1083
1084                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1085                    {
1086                        const tr_file * file = &tor->info.files[fileIndex];
1087                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1088                        {
1089                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1090                            tordbg( t, "closing recently-completed file \"%s\"", path );
1091                            tr_fdFileClose( path );
1092                            tr_free( path );
1093                        }
1094                    }
1095                }
1096
1097                tr_torrentRecheckCompleteness( tor );
1098            }
1099            break;
1100        }
1101
1102        case TR_PEER_ERROR:
1103            if( e->err == EINVAL )
1104            {
1105                addStrike( t, peer );
1106                peer->doPurge = 1;
1107                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1108            }
1109            else if( ( e->err == ERANGE )
1110                  || ( e->err == EMSGSIZE )
1111                  || ( e->err == ENOTCONN ) )
1112            {
1113                /* some protocol error from the peer */
1114                peer->doPurge = 1;
1115                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1116            }
1117            else /* a local error, such as an IO error */
1118            {
1119                t->tor->error = e->err;
1120                tr_strlcpy( t->tor->errorString,
1121                            tr_strerror( t->tor->error ),
1122                            sizeof( t->tor->errorString ) );
1123                tr_torrentStop( t->tor );
1124            }
1125            break;
1126
1127        default:
1128            assert( 0 );
1129    }
1130
1131    torrentUnlock( t );
1132}
1133
1134static void
1135ensureAtomExists( Torrent          * t,
1136                  const tr_address * addr,
1137                  tr_port            port,
1138                  uint8_t            flags,
1139                  uint8_t            from )
1140{
1141    if( getExistingAtom( t, addr ) == NULL )
1142    {
1143        struct peer_atom * a;
1144        a = tr_new0( struct peer_atom, 1 );
1145        a->addr = *addr;
1146        a->port = port;
1147        a->flags = flags;
1148        a->from = from;
1149        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1150        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1151    }
1152}
1153
1154static int
1155getMaxPeerCount( const tr_torrent * tor )
1156{
1157    return tor->maxConnectedPeers;
1158}
1159
1160static int
1161getPeerCount( const Torrent * t )
1162{
1163    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1164}
1165
1166/* FIXME: this is kind of a mess. */
1167static tr_bool
1168myHandshakeDoneCB( tr_handshake  * handshake,
1169                   tr_peerIo     * io,
1170                   tr_bool         isConnected,
1171                   const uint8_t * peer_id,
1172                   void          * vmanager )
1173{
1174    tr_bool            ok = isConnected;
1175    tr_bool            success = FALSE;
1176    tr_port            port;
1177    const tr_address * addr;
1178    tr_peerMgr       * manager = vmanager;
1179    Torrent          * t;
1180    tr_handshake     * ours;
1181
1182    assert( io );
1183    assert( tr_isBool( ok ) );
1184
1185    t = tr_peerIoHasTorrentHash( io )
1186        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1187        : NULL;
1188
1189    if( tr_peerIoIsIncoming ( io ) )
1190        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1191                                        handshake, handshakeCompare );
1192    else if( t )
1193        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1194                                        handshake, handshakeCompare );
1195    else
1196        ours = handshake;
1197
1198    assert( ours );
1199    assert( ours == handshake );
1200
1201    if( t )
1202        torrentLock( t );
1203
1204    addr = tr_peerIoGetAddress( io, &port );
1205
1206    if( !ok || !t || !t->isRunning )
1207    {
1208        if( t )
1209        {
1210            struct peer_atom * atom = getExistingAtom( t, addr );
1211            if( atom )
1212                ++atom->numFails;
1213        }
1214    }
1215    else /* looking good */
1216    {
1217        struct peer_atom * atom;
1218        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1219        atom = getExistingAtom( t, addr );
1220        atom->time = time( NULL );
1221        atom->piece_data_time = 0;
1222
1223        if( atom->myflags & MYFLAG_BANNED )
1224        {
1225            tordbg( t, "banned peer %s tried to reconnect",
1226                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1227        }
1228        else if( tr_peerIoIsIncoming( io )
1229               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1230
1231        {
1232        }
1233        else
1234        {
1235            tr_peer * peer = getExistingPeer( t, addr );
1236
1237            if( peer ) /* we already have this peer */
1238            {
1239            }
1240            else
1241            {
1242                peer = getPeer( t, addr );
1243                tr_free( peer->client );
1244
1245                if( !peer_id )
1246                    peer->client = NULL;
1247                else {
1248                    char client[128];
1249                    tr_clientForId( client, sizeof( client ), peer_id );
1250                    peer->client = tr_strdup( client );
1251                }
1252
1253                peer->port = port;
1254                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1255                                                                balanced by our unref in peerDestructor()  */
1256                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1257                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1258
1259                success = TRUE;
1260            }
1261        }
1262    }
1263
1264    if( t )
1265        torrentUnlock( t );
1266
1267    return success;
1268}
1269
1270void
1271tr_peerMgrAddIncoming( tr_peerMgr * manager,
1272                       tr_address * addr,
1273                       tr_port      port,
1274                       int          socket )
1275{
1276    managerLock( manager );
1277
1278    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1279    {
1280        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1281        tr_netClose( socket );
1282    }
1283    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1284    {
1285        tr_netClose( socket );
1286    }
1287    else /* we don't have a connetion to them yet... */
1288    {
1289        tr_peerIo *    io;
1290        tr_handshake * handshake;
1291
1292        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1293
1294        handshake = tr_handshakeNew( io,
1295                                     manager->session->encryptionMode,
1296                                     myHandshakeDoneCB,
1297                                     manager );
1298
1299        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1300
1301        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1302                                 handshakeCompare );
1303    }
1304
1305    managerUnlock( manager );
1306}
1307
1308static tr_bool
1309tr_isPex( const tr_pex * pex )
1310{
1311    return pex && tr_isAddress( &pex->addr );
1312}
1313
1314void
1315tr_peerMgrAddPex( tr_torrent   *  tor,
1316                  uint8_t         from,
1317                  const tr_pex *  pex )
1318{
1319    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1320    {
1321        Torrent * t = tor->torrentPeers;
1322        managerLock( t->manager );
1323
1324        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1325            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1326                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1327
1328        managerUnlock( t->manager );
1329    }
1330}
1331
1332tr_pex *
1333tr_peerMgrCompactToPex( const void *    compact,
1334                        size_t          compactLen,
1335                        const uint8_t * added_f,
1336                        size_t          added_f_len,
1337                        size_t *        pexCount )
1338{
1339    size_t          i;
1340    size_t          n = compactLen / 6;
1341    const uint8_t * walk = compact;
1342    tr_pex *        pex = tr_new0( tr_pex, n );
1343
1344    for( i = 0; i < n; ++i )
1345    {
1346        pex[i].addr.type = TR_AF_INET;
1347        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1348        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1349        if( added_f && ( n == added_f_len ) )
1350            pex[i].flags = added_f[i];
1351    }
1352
1353    *pexCount = n;
1354    return pex;
1355}
1356
1357tr_pex *
1358tr_peerMgrCompact6ToPex( const void    * compact,
1359                         size_t          compactLen,
1360                         const uint8_t * added_f,
1361                         size_t          added_f_len,
1362                         size_t        * pexCount )
1363{
1364    size_t          i;
1365    size_t          n = compactLen / 18;
1366    const uint8_t * walk = compact;
1367    tr_pex *        pex = tr_new0( tr_pex, n );
1368   
1369    for( i = 0; i < n; ++i )
1370    {
1371        pex[i].addr.type = TR_AF_INET6;
1372        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1373        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1374        if( added_f && ( n == added_f_len ) )
1375            pex[i].flags = added_f[i];
1376    }
1377   
1378    *pexCount = n;
1379    return pex;
1380}
1381
1382tr_pex *
1383tr_peerMgrArrayToPex( const void * array,
1384                      size_t       arrayLen,
1385                      size_t      * pexCount )
1386{
1387    size_t          i;
1388    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1389    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1390    const uint8_t * walk = array;
1391    tr_pex        * pex = tr_new0( tr_pex, n );
1392   
1393    for( i = 0 ; i < n ; i++ ) {
1394        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1395        tr_suspectAddress( &pex[i].addr, "tracker"  );
1396        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1397        pex[i].flags = 0x00;
1398        walk += sizeof( tr_address ) + 2;
1399    }
1400   
1401    *pexCount = n;
1402    return pex;
1403}
1404
1405/**
1406***
1407**/
1408
1409void
1410tr_peerMgrSetBlame( tr_torrent     * tor,
1411                    tr_piece_index_t pieceIndex,
1412                    int              success )
1413{
1414    if( !success )
1415    {
1416        int        peerCount, i;
1417        Torrent *  t = tor->torrentPeers;
1418        tr_peer ** peers;
1419
1420        assert( torrentIsLocked( t ) );
1421
1422        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1423        for( i = 0; i < peerCount; ++i )
1424        {
1425            tr_peer * peer = peers[i];
1426            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1427            {
1428                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1429                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1430                        pieceIndex, (int)peer->strikes + 1 );
1431                addStrike( t, peer );
1432            }
1433        }
1434    }
1435}
1436
1437int
1438tr_pexCompare( const void * va, const void * vb )
1439{
1440    const tr_pex * a = va;
1441    const tr_pex * b = vb;
1442    int i;
1443
1444    assert( tr_isPex( a ) );
1445    assert( tr_isPex( b ) );
1446
1447    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1448        return i;
1449
1450    if( a->port != b->port )
1451        return a->port < b->port ? -1 : 1;
1452
1453    return 0;
1454}
1455
1456static int
1457peerPrefersCrypto( const tr_peer * peer )
1458{
1459    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1460        return TRUE;
1461
1462    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1463        return FALSE;
1464
1465    return tr_peerIoIsEncrypted( peer->io );
1466}
1467
1468int
1469tr_peerMgrGetPeers( tr_torrent      * tor,
1470                    tr_pex         ** setme_pex,
1471                    uint8_t           af)
1472{
1473    int peersReturning = 0;
1474    const Torrent * t = tor->torrentPeers;
1475
1476    managerLock( t->manager );
1477
1478    {
1479        int i;
1480        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1481        const int peerCount = tr_ptrArraySize( &t->peers );
1482        /* for now, this will waste memory on torrents that have both
1483         * ipv6 and ipv4 peers */
1484        tr_pex * pex = tr_new( tr_pex, peerCount );
1485        tr_pex * walk = pex;
1486
1487        for( i=0; i<peerCount; ++i )
1488        {
1489            const tr_peer * peer = peers[i];
1490            if( peer->addr.type == af )
1491            {
1492                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1493
1494                assert( tr_isAddress( &peer->addr ) );
1495                walk->addr = peer->addr;
1496                walk->port = peer->port;
1497                walk->flags = 0;
1498                if( peerPrefersCrypto( peer ) )
1499                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1500                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1501                    walk->flags |= ADDED_F_SEED_FLAG;
1502                ++peersReturning;
1503                ++walk;
1504            }
1505        }
1506
1507        assert( ( walk - pex ) == peersReturning );
1508        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1509        *setme_pex = pex;
1510    }
1511
1512    managerUnlock( t->manager );
1513    return peersReturning;
1514}
1515
1516void
1517tr_peerMgrStartTorrent( tr_torrent * tor )
1518{
1519    Torrent * t = tor->torrentPeers;
1520
1521    managerLock( t->manager );
1522
1523    assert( t );
1524
1525    if( !t->isRunning )
1526    {
1527        t->isRunning = TRUE;
1528
1529        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1530            refillSoon( t );
1531    }
1532
1533    managerUnlock( t->manager );
1534}
1535
1536static void
1537stopTorrent( Torrent * t )
1538{
1539    assert( torrentIsLocked( t ) );
1540
1541    t->isRunning = FALSE;
1542
1543    /* disconnect the peers. */
1544    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1545    tr_ptrArrayClear( &t->peers );
1546
1547    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1548     * which removes the handshake from t->outgoingHandshakes... */
1549    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1550        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1551}
1552
1553void
1554tr_peerMgrStopTorrent( tr_torrent * tor )
1555{
1556    Torrent * t = tor->torrentPeers;
1557
1558    managerLock( t->manager );
1559
1560    stopTorrent( t );
1561
1562    managerUnlock( t->manager );
1563}
1564
1565void
1566tr_peerMgrAddTorrent( tr_peerMgr * manager,
1567                      tr_torrent * tor )
1568{
1569    managerLock( manager );
1570
1571    assert( tor );
1572    assert( tor->torrentPeers == NULL );
1573
1574    tor->torrentPeers = torrentConstructor( manager, tor );
1575
1576    managerUnlock( manager );
1577}
1578
1579void
1580tr_peerMgrRemoveTorrent( tr_torrent * tor )
1581{
1582    tr_torrentLock( tor );
1583
1584    stopTorrent( tor->torrentPeers );
1585    torrentDestructor( tor->torrentPeers );
1586
1587    tr_torrentUnlock( tor );
1588}
1589
1590void
1591tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1592                               int8_t           * tab,
1593                               unsigned int       tabCount )
1594{
1595    tr_piece_index_t   i;
1596    const Torrent *    t;
1597    float              interval;
1598    tr_bool            isSeed;
1599    int                peerCount;
1600    const tr_peer **   peers;
1601    tr_torrentLock( tor );
1602
1603    t = tor->torrentPeers;
1604    tor = t->tor;
1605    interval = tor->info.pieceCount / (float)tabCount;
1606    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1607    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1608    peerCount = tr_ptrArraySize( &t->peers );
1609
1610    memset( tab, 0, tabCount );
1611
1612    for( i = 0; tor && i < tabCount; ++i )
1613    {
1614        const int piece = i * interval;
1615
1616        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1617            tab[i] = -1;
1618        else if( peerCount ) {
1619            int j;
1620            for( j = 0; j < peerCount; ++j )
1621                if( tr_bitfieldHas( peers[j]->have, i ) )
1622                    ++tab[i];
1623        }
1624    }
1625
1626    tr_torrentUnlock( tor );
1627}
1628
1629/* Returns the pieces that are available from peers */
1630tr_bitfield*
1631tr_peerMgrGetAvailable( const tr_torrent * tor )
1632{
1633    int i;
1634    int peerCount;
1635    Torrent * t = tor->torrentPeers;
1636    const tr_peer ** peers;
1637    tr_bitfield * pieces;
1638    managerLock( t->manager );
1639
1640    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1641    peerCount = tr_ptrArraySize( &t->peers );
1642    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1643    for( i=0; i<peerCount; ++i )
1644        tr_bitfieldOr( pieces, peers[i]->have );
1645
1646    managerUnlock( t->manager );
1647    return pieces;
1648}
1649
1650void
1651tr_peerMgrTorrentStats( tr_torrent       * tor,
1652                        int              * setmePeersKnown,
1653                        int              * setmePeersConnected,
1654                        int              * setmeSeedsConnected,
1655                        int              * setmeWebseedsSendingToUs,
1656                        int              * setmePeersSendingToUs,
1657                        int              * setmePeersGettingFromUs,
1658                        int              * setmePeersFrom )
1659{
1660    int i, size;
1661    const Torrent * t = tor->torrentPeers;
1662    const tr_peer ** peers;
1663    const tr_webseed ** webseeds;
1664
1665    managerLock( t->manager );
1666
1667    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1668    size = tr_ptrArraySize( &t->peers );
1669
1670    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1671    *setmePeersConnected       = 0;
1672    *setmeSeedsConnected       = 0;
1673    *setmePeersGettingFromUs   = 0;
1674    *setmePeersSendingToUs     = 0;
1675    *setmeWebseedsSendingToUs  = 0;
1676
1677    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1678        setmePeersFrom[i] = 0;
1679
1680    for( i=0; i<size; ++i )
1681    {
1682        const tr_peer * peer = peers[i];
1683        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1684
1685        if( peer->io == NULL ) /* not connected */
1686            continue;
1687
1688        ++*setmePeersConnected;
1689
1690        ++setmePeersFrom[atom->from];
1691
1692        if( clientIsDownloadingFrom( peer ) )
1693            ++*setmePeersSendingToUs;
1694
1695        if( clientIsUploadingTo( peer ) )
1696            ++*setmePeersGettingFromUs;
1697
1698        if( atom->flags & ADDED_F_SEED_FLAG )
1699            ++*setmeSeedsConnected;
1700    }
1701
1702    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1703    size = tr_ptrArraySize( &t->webseeds );
1704    for( i=0; i<size; ++i )
1705        if( tr_webseedIsActive( webseeds[i] ) )
1706            ++*setmeWebseedsSendingToUs;
1707
1708    managerUnlock( t->manager );
1709}
1710
1711float*
1712tr_peerMgrWebSpeeds( const tr_torrent * tor )
1713{
1714    const Torrent * t = tor->torrentPeers;
1715    const tr_webseed ** webseeds;
1716    int i;
1717    int webseedCount;
1718    float * ret;
1719    uint64_t now;
1720
1721    assert( t->manager );
1722    managerLock( t->manager );
1723
1724    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1725    webseedCount = tr_ptrArraySize( &t->webseeds );
1726    assert( webseedCount == tor->info.webseedCount );
1727    ret = tr_new0( float, webseedCount );
1728    now = tr_date( );
1729
1730    for( i=0; i<webseedCount; ++i )
1731        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1732            ret[i] = -1.0;
1733
1734    managerUnlock( t->manager );
1735    return ret;
1736}
1737
1738double
1739tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1740{
1741    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1742}
1743
1744
1745struct tr_peer_stat *
1746tr_peerMgrPeerStats( const tr_torrent    * tor,
1747                     int                 * setmeCount )
1748{
1749    int i, size;
1750    const Torrent * t = tor->torrentPeers;
1751    const tr_peer ** peers;
1752    tr_peer_stat * ret;
1753    uint64_t now;
1754
1755    assert( t->manager );
1756    managerLock( t->manager );
1757
1758    size = tr_ptrArraySize( &t->peers );
1759    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1760    ret = tr_new0( tr_peer_stat, size );
1761    now = tr_date( );
1762
1763    for( i = 0; i < size; ++i )
1764    {
1765        char *                   pch;
1766        const tr_peer *          peer = peers[i];
1767        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1768        tr_peer_stat *           stat = ret + i;
1769        tr_address               norm_addr;
1770
1771        norm_addr = peer->addr;
1772        tr_normalizeV4Mapped( &norm_addr );
1773        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1774        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1775                   sizeof( stat->client ) );
1776        stat->port               = ntohs( peer->port );
1777        stat->from               = atom->from;
1778        stat->progress           = peer->progress;
1779        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1780        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1781        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1782        stat->peerIsChoked       = peer->peerIsChoked;
1783        stat->peerIsInterested   = peer->peerIsInterested;
1784        stat->clientIsChoked     = peer->clientIsChoked;
1785        stat->clientIsInterested = peer->clientIsInterested;
1786        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1787        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1788        stat->isUploadingTo      = clientIsUploadingTo( peer );
1789        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1790
1791        pch = stat->flagStr;
1792        if( t->optimistic == peer ) *pch++ = 'O';
1793        if( stat->isDownloadingFrom ) *pch++ = 'D';
1794        else if( stat->clientIsInterested ) *pch++ = 'd';
1795        if( stat->isUploadingTo ) *pch++ = 'U';
1796        else if( stat->peerIsInterested ) *pch++ = 'u';
1797        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1798        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1799        if( stat->isEncrypted ) *pch++ = 'E';
1800        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1801        if( stat->isIncoming ) *pch++ = 'I';
1802        *pch = '\0';
1803    }
1804
1805    *setmeCount = size;
1806
1807    managerUnlock( t->manager );
1808    return ret;
1809}
1810
1811/**
1812***
1813**/
1814
1815struct ChokeData
1816{
1817    tr_bool         doUnchoke;
1818    tr_bool         isInterested;
1819    tr_bool         isChoked;
1820    int             rate;
1821    tr_peer *       peer;
1822};
1823
1824static int
1825compareChoke( const void * va,
1826              const void * vb )
1827{
1828    const struct ChokeData * a = va;
1829    const struct ChokeData * b = vb;
1830
1831    if( a->rate != b->rate ) /* prefer higher overall speeds */
1832        return a->rate > b->rate ? -1 : 1;
1833
1834    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1835        return a->isChoked ? 1 : -1;
1836
1837    return 0;
1838}
1839
1840static int
1841isNew( const tr_peer * peer )
1842{
1843    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1844}
1845
1846static int
1847isSame( const tr_peer * peer )
1848{
1849    return peer && peer->client && strstr( peer->client, "Transmission" );
1850}
1851
1852/**
1853***
1854**/
1855
1856static void
1857rechokeTorrent( Torrent * t )
1858{
1859    int i, size, unchokedInterested;
1860    const int peerCount = tr_ptrArraySize( &t->peers );
1861    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1862    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1863    const tr_session * session = t->manager->session;
1864    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1865    const uint64_t now = tr_date( );
1866
1867    assert( torrentIsLocked( t ) );
1868
1869    /* sort the peers by preference and rate */
1870    for( i = 0, size = 0; i < peerCount; ++i )
1871    {
1872        tr_peer * peer = peers[i];
1873        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1874
1875        if( peer->progress >= 1.0 ) /* choke all seeds */
1876        {
1877            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1878        }
1879        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1880        {
1881            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1882        }
1883        else if( chokeAll ) /* choke everyone if we're not uploading */
1884        {
1885            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1886        }
1887        else
1888        {
1889            struct ChokeData * n = &choke[size++];
1890            n->peer         = peer;
1891            n->isInterested = peer->peerIsInterested;
1892            n->isChoked     = peer->peerIsChoked;
1893            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1894        }
1895    }
1896
1897    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1898
1899    /**
1900     * Reciprocation and number of uploads capping is managed by unchoking
1901     * the N peers which have the best upload rate and are interested.
1902     * This maximizes the client's download rate. These N peers are
1903     * referred to as downloaders, because they are interested in downloading
1904     * from the client.
1905     *
1906     * Peers which have a better upload rate (as compared to the downloaders)
1907     * but aren't interested get unchoked. If they become interested, the
1908     * downloader with the worst upload rate gets choked. If a client has
1909     * a complete file, it uses its upload rate rather than its download
1910     * rate to decide which peers to unchoke.
1911     */
1912    unchokedInterested = 0;
1913    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1914        choke[i].doUnchoke = 1;
1915        if( choke[i].isInterested )
1916            ++unchokedInterested;
1917    }
1918
1919    /* optimistic unchoke */
1920    if( i < size )
1921    {
1922        int n;
1923        struct ChokeData * c;
1924        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1925
1926        for( ; i<size; ++i )
1927        {
1928            if( choke[i].isInterested )
1929            {
1930                const tr_peer * peer = choke[i].peer;
1931                int x = 1, y;
1932                if( isNew( peer ) ) x *= 3;
1933                if( isSame( peer ) ) x *= 3;
1934                for( y=0; y<x; ++y )
1935                    tr_ptrArrayAppend( &randPool, &choke[i] );
1936            }
1937        }
1938
1939        if(( n = tr_ptrArraySize( &randPool )))
1940        {
1941            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
1942            c->doUnchoke = 1;
1943            t->optimistic = c->peer;
1944        }
1945
1946        tr_ptrArrayDestruct( &randPool, NULL );
1947    }
1948
1949    for( i=0; i<size; ++i )
1950        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1951
1952    /* cleanup */
1953    tr_free( choke );
1954}
1955
1956static int
1957rechokePulse( void * vmgr )
1958{
1959    tr_torrent * tor = NULL;
1960    tr_peerMgr * mgr = vmgr;
1961    managerLock( mgr );
1962
1963    while(( tor = tr_torrentNext( mgr->session, tor )))
1964        if( tor->isRunning )
1965            rechokeTorrent( tor->torrentPeers );
1966
1967    managerUnlock( mgr );
1968    return TRUE;
1969}
1970
1971/***
1972****
1973****  Life and Death
1974****
1975***/
1976
1977typedef enum
1978{
1979    TR_CAN_KEEP,
1980    TR_CAN_CLOSE,
1981    TR_MUST_CLOSE,
1982}
1983tr_close_type_t;
1984
1985static tr_close_type_t
1986shouldPeerBeClosed( const Torrent    * t,
1987                    const tr_peer    * peer,
1988                    int                peerCount )
1989{
1990    const tr_torrent *       tor = t->tor;
1991    const time_t             now = time( NULL );
1992    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1993
1994    /* if it's marked for purging, close it */
1995    if( peer->doPurge )
1996    {
1997        tordbg( t, "purging peer %s because its doPurge flag is set",
1998                tr_peerIoAddrStr( &atom->addr, atom->port ) );
1999        return TR_MUST_CLOSE;
2000    }
2001
2002    /* if we're seeding and the peer has everything we have,
2003     * and enough time has passed for a pex exchange, then disconnect */
2004    if( tr_torrentIsSeed( tor ) )
2005    {
2006        int peerHasEverything;
2007        if( atom->flags & ADDED_F_SEED_FLAG )
2008            peerHasEverything = TRUE;
2009        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2010            peerHasEverything = FALSE;
2011        else {
2012            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2013            tr_bitfieldDifference( tmp, peer->have );
2014            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2015            tr_bitfieldFree( tmp );
2016        }
2017
2018        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2019        {
2020            tordbg( t, "purging peer %s because we're both seeds",
2021                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2022            return TR_MUST_CLOSE;
2023        }
2024    }
2025
2026    /* disconnect if it's been too long since piece data has been transferred.
2027     * this is on a sliding scale based on number of available peers... */
2028    {
2029        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2030        /* if we have >= relaxIfFewerThan, strictness is 100%.
2031         * if we have zero connections, strictness is 0% */
2032        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2033                               ? 1.0
2034                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2035        const int lo = MIN_UPLOAD_IDLE_SECS;
2036        const int hi = MAX_UPLOAD_IDLE_SECS;
2037        const int limit = hi - ( ( hi - lo ) * strictness );
2038        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2039/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2040        if( idleTime > limit ) {
2041            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2042                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2043            return TR_CAN_CLOSE;
2044        }
2045    }
2046
2047    return TR_CAN_KEEP;
2048}
2049
2050static tr_peer **
2051getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2052{
2053    int i, peerCount, outsize;
2054    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2055    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2056
2057    assert( torrentIsLocked( t ) );
2058
2059    for( i = outsize = 0; i < peerCount; ++i )
2060        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2061            ret[outsize++] = peers[i];
2062
2063    *setmeSize = outsize;
2064    return ret;
2065}
2066
2067static int
2068compareCandidates( const void * va,
2069                   const void * vb )
2070{
2071    const struct peer_atom * a = *(const struct peer_atom**) va;
2072    const struct peer_atom * b = *(const struct peer_atom**) vb;
2073
2074    /* <Charles> Here we would probably want to try reconnecting to
2075     * peers that had most recently given us data. Lots of users have
2076     * trouble with resets due to their routers and/or ISPs. This way we
2077     * can quickly recover from an unwanted reset. So we sort
2078     * piece_data_time in descending order.
2079     */
2080
2081    if( a->piece_data_time != b->piece_data_time )
2082        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2083
2084    if( a->numFails != b->numFails )
2085        return a->numFails < b->numFails ? -1 : 1;
2086
2087    if( a->time != b->time )
2088        return a->time < b->time ? -1 : 1;
2089
2090    /* all other things being equal, prefer peers whose
2091     * information comes from a more reliable source */
2092    if( a->from != b->from )
2093        return a->from < b->from ? -1 : 1;
2094
2095    return 0;
2096}
2097
2098static int
2099getReconnectIntervalSecs( const struct peer_atom * atom )
2100{
2101    int          sec;
2102    const time_t now = time( NULL );
2103
2104    /* if we were recently connected to this peer and transferring piece
2105     * data, try to reconnect to them sooner rather that later -- we don't
2106     * want network troubles to get in the way of a good peer. */
2107    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2108        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2109
2110    /* don't allow reconnects more often than our minimum */
2111    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2112        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2113
2114    /* otherwise, the interval depends on how many times we've tried
2115     * and failed to connect to the peer */
2116    else switch( atom->numFails ) {
2117        case 0: sec = 0; break;
2118        case 1: sec = 5; break;
2119        case 2: sec = 2 * 60; break;
2120        case 3: sec = 15 * 60; break;
2121        case 4: sec = 30 * 60; break;
2122        case 5: sec = 60 * 60; break;
2123        default: sec = 120 * 60; break;
2124    }
2125
2126    return sec;
2127}
2128
2129static struct peer_atom **
2130getPeerCandidates( Torrent * t, int * setmeSize )
2131{
2132    int                 i, atomCount, retCount;
2133    struct peer_atom ** atoms;
2134    struct peer_atom ** ret;
2135    const time_t        now = time( NULL );
2136    const int           seed = tr_torrentIsSeed( t->tor );
2137
2138    assert( torrentIsLocked( t ) );
2139
2140    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2141    ret = tr_new( struct peer_atom*, atomCount );
2142    for( i = retCount = 0; i < atomCount; ++i )
2143    {
2144        int                interval;
2145        struct peer_atom * atom = atoms[i];
2146
2147        /* peer fed us too much bad data ... we only keep it around
2148         * now to weed it out in case someone sends it to us via pex */
2149        if( atom->myflags & MYFLAG_BANNED )
2150            continue;
2151
2152        /* peer was unconnectable before, so we're not going to keep trying.
2153         * this is needs a separate flag from `banned', since if they try
2154         * to connect to us later, we'll let them in */
2155        if( atom->myflags & MYFLAG_UNREACHABLE )
2156            continue;
2157
2158        /* we don't need two connections to the same peer... */
2159        if( peerIsInUse( t, &atom->addr ) )
2160            continue;
2161
2162        /* no need to connect if we're both seeds... */
2163        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2164                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2165            continue;
2166
2167        /* don't reconnect too often */
2168        interval = getReconnectIntervalSecs( atom );
2169        if( ( now - atom->time ) < interval )
2170        {
2171            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2172                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2173            continue;
2174        }
2175
2176        /* Don't connect to peers in our blocklist */
2177        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2178            continue;
2179
2180        ret[retCount++] = atom;
2181    }
2182
2183    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2184    *setmeSize = retCount;
2185    return ret;
2186}
2187
2188static void
2189closePeer( Torrent * t, tr_peer * peer )
2190{
2191    struct peer_atom * atom;
2192
2193    assert( t != NULL );
2194    assert( peer != NULL );
2195
2196    /* if we transferred piece data, then they might be good peers,
2197       so reset their `numFails' weight to zero.  otherwise we connected
2198       to them fruitlessly, so mark it as another fail */
2199    atom = getExistingAtom( t, &peer->addr );
2200    if( atom->piece_data_time )
2201        atom->numFails = 0;
2202    else
2203        ++atom->numFails;
2204
2205    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2206    removePeer( t, peer );
2207}
2208
2209static void
2210reconnectTorrent( Torrent * t )
2211{
2212    static time_t prevTime = 0;
2213    static int    newConnectionsThisSecond = 0;
2214    time_t        now;
2215
2216    now = time( NULL );
2217    if( prevTime != now )
2218    {
2219        prevTime = now;
2220        newConnectionsThisSecond = 0;
2221    }
2222
2223    if( !t->isRunning )
2224    {
2225        removeAllPeers( t );
2226    }
2227    else
2228    {
2229        int i;
2230        int canCloseCount;
2231        int mustCloseCount;
2232        int candidateCount;
2233        int maxCandidates;
2234        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2235        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2236        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2237
2238        tordbg( t, "reconnect pulse for [%s]: "
2239                   "%d must-close connections, "
2240                   "%d can-close connections, "
2241                   "%d connection candidates, "
2242                   "%d atoms, "
2243                   "max per pulse is %d",
2244                   t->tor->info.name,
2245                   mustCloseCount,
2246                   canCloseCount,
2247                   candidateCount,
2248                   tr_ptrArraySize( &t->pool ),
2249                   MAX_RECONNECTIONS_PER_PULSE );
2250
2251        /* disconnect the really bad peers */
2252        for( i=0; i<mustCloseCount; ++i )
2253            closePeer( t, mustClose[i] );
2254
2255        /* decide how many peers can we try to add in this pass */
2256        maxCandidates = candidateCount;
2257        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2258        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2259        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2260
2261        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2262        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2263            closePeer( t, canClose[i] );
2264
2265        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2266                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2267                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2268                   candidateCount,
2269                   MAX_RECONNECTIONS_PER_PULSE,
2270                   getPeerCount( t ),
2271                   getMaxPeerCount( t->tor ),
2272                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2273
2274        /* add some new ones */
2275        for( i=0; i<maxCandidates; ++i )
2276        {
2277            tr_peerMgr        * mgr = t->manager;
2278            struct peer_atom  * atom = candidates[i];
2279            tr_peerIo         * io;
2280
2281            tordbg( t, "Starting an OUTGOING connection with %s",
2282                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2283
2284            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2285
2286            if( io == NULL )
2287            {
2288                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2289                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2290                atom->myflags |= MYFLAG_UNREACHABLE;
2291            }
2292            else
2293            {
2294                tr_handshake * handshake = tr_handshakeNew( io,
2295                                                            mgr->session->encryptionMode,
2296                                                            myHandshakeDoneCB,
2297                                                            mgr );
2298
2299                assert( tr_peerIoGetTorrentHash( io ) );
2300
2301                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2302
2303                ++newConnectionsThisSecond;
2304
2305                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2306                                         handshakeCompare );
2307            }
2308
2309            atom->time = time( NULL );
2310        }
2311
2312        /* cleanup */
2313        tr_free( candidates );
2314        tr_free( mustClose );
2315        tr_free( canClose );
2316    }
2317}
2318
2319static int
2320reconnectPulse( void * vmgr )
2321{
2322    tr_torrent * tor = NULL;
2323    tr_peerMgr * mgr = vmgr;
2324    managerLock( mgr );
2325
2326    while(( tor = tr_torrentNext( mgr->session, tor )))
2327        if( tor->isRunning )
2328            reconnectTorrent( tor->torrentPeers );
2329
2330    managerUnlock( mgr );
2331    return TRUE;
2332}
2333
2334/****
2335*****
2336*****  BANDWIDTH ALLOCATION
2337*****
2338****/
2339
2340static void
2341pumpAllPeers( tr_peerMgr * mgr )
2342{
2343    tr_torrent * tor = NULL;
2344
2345    while(( tor = tr_torrentNext( mgr->session, tor )))
2346    {
2347        int j;
2348        Torrent * t = tor->torrentPeers;
2349
2350        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2351        {
2352            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2353            tr_peerMsgsPulse( peer->msgs );
2354        }
2355    }
2356}
2357
2358static int
2359bandwidthPulse( void * vmgr )
2360{
2361    tr_peerMgr * mgr = vmgr;
2362    managerLock( mgr );
2363
2364    /* FIXME: this next line probably isn't necessary... */
2365    pumpAllPeers( mgr );
2366
2367    /* allocate bandwidth to the peers */
2368    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2369    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2370
2371    managerUnlock( mgr );
2372    return TRUE;
2373}
Note: See TracBrowser for help on using the repository browser.