source: trunk/libtransmission/peer-mgr.c @ 7619

Last change on this file since 7619 was 7619, checked in by charles, 12 years ago

(trunk libT) probably fix r7618 reported in #transmission by Rolcol

  • Property svn:keywords set to Date Rev Author Id
File size: 68.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7619 2009-01-05 06:45:08Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 16,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 32,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static inline void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static inline void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static inline void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static inline void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static inline int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va, const void * vb )
202{
203    const tr_handshake * a = va;
204
205    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
206}
207
208static int
209handshakeCompare( const void * a, const void * b )
210{
211    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
212}
213
214static tr_handshake*
215getExistingHandshake( tr_ptrArray      * handshakes,
216                      const tr_address * addr )
217{
218    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va, const void * vb )
223{
224    const struct peer_atom * a = va;
225
226    return tr_compareAddresses( &a->addr, vb );
227}
228
229static int
230comparePeerAtoms( const void * va, const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va, const void * vb )
272{
273    const tr_peer * a = va;
274    const tr_peer * b = vb;
275
276    return tr_compareAddresses( &a->addr, &b->addr );
277}
278
279static int
280peerCompareToAddr( const void * va, const void * vb )
281{
282    const tr_peer * a = va;
283
284    return tr_compareAddresses( &a->addr, vb );
285}
286
287static tr_peer*
288getExistingPeer( Torrent          * torrent,
289                 const tr_address * addr )
290{
291    assert( torrentIsLocked( torrent ) );
292    assert( addr );
293
294    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const Torrent    * t,
299                 const tr_address * addr )
300{
301    Torrent * tt = (Torrent*)t;
302    assert( torrentIsLocked( t ) );
303    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
304}
305
306static tr_bool
307peerIsInUse( const Torrent    * ct,
308             const tr_address * addr )
309{
310    Torrent * t = (Torrent*) ct;
311
312    assert( torrentIsLocked ( t ) );
313
314    return getExistingPeer( t, addr )
315        || getExistingHandshake( &t->outgoingHandshakes, addr )
316        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
317}
318
319static tr_peer*
320peerConstructor( const tr_address * addr )
321{
322    tr_peer * p;
323    p = tr_new0( tr_peer, 1 );
324    p->addr = *addr;
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent          * torrent,
330         const tr_address * addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( addr );
341        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351
352    if( peer->msgs != NULL )
353    {
354        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
355        tr_peerMsgsFree( peer->msgs );
356    }
357
358    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
359
360    tr_bitfieldFree( peer->have );
361    tr_bitfieldFree( peer->blame );
362    tr_free( peer->client );
363
364    tr_free( peer );
365}
366
367static void
368removePeer( Torrent * t,
369            tr_peer * peer )
370{
371    tr_peer *          removed;
372    struct peer_atom * atom;
373
374    assert( torrentIsLocked( t ) );
375
376    atom = getExistingAtom( t, &peer->addr );
377    assert( atom );
378    atom->time = time( NULL );
379
380    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
381    assert( removed == peer );
382    peerDestructor( removed );
383}
384
385static void
386removeAllPeers( Torrent * t )
387{
388    while( !tr_ptrArrayEmpty( &t->peers ) )
389        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
390}
391
392static void
393torrentDestructor( void * vt )
394{
395    Torrent * t = vt;
396    uint8_t   hash[SHA_DIGEST_LENGTH];
397
398    assert( t );
399    assert( !t->isRunning );
400    assert( torrentIsLocked( t ) );
401    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
402    assert( tr_ptrArrayEmpty( &t->peers ) );
403
404    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
405
406    tr_timerFree( &t->reconnectTimer );
407    tr_timerFree( &t->rechokeTimer );
408    tr_timerFree( &t->refillTimer );
409
410    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
411    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
412    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
413    tr_ptrArrayDestruct( &t->peers, NULL );
414
415    tr_free( t->pendingRequestCount );
416    tr_free( t );
417}
418
419static void peerCallbackFunc( void * vpeer,
420                              void * vevent,
421                              void * vt );
422
423static Torrent*
424torrentConstructor( tr_peerMgr * manager,
425                    tr_torrent * tor )
426{
427    int       i;
428    Torrent * t;
429
430    t = tr_new0( Torrent, 1 );
431    t->manager = manager;
432    t->tor = tor;
433    t->pool = TR_PTR_ARRAY_INIT;
434    t->peers = TR_PTR_ARRAY_INIT;
435    t->webseeds = TR_PTR_ARRAY_INIT;
436    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
437    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
438
439    for( i = 0; i < tor->info.webseedCount; ++i )
440    {
441        tr_webseed * w =
442            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
443        tr_ptrArrayAppend( &t->webseeds, w );
444    }
445
446    return t;
447}
448
449
450static int bandwidthPulse( void * vmgr );
451
452
453tr_peerMgr*
454tr_peerMgrNew( tr_session * session )
455{
456    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
457
458    m->session = session;
459    m->torrents = TR_PTR_ARRAY_INIT;
460    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
461    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
462    return m;
463}
464
465void
466tr_peerMgrFree( tr_peerMgr * manager )
467{
468    managerLock( manager );
469
470    tr_timerFree( &manager->bandwidthTimer );
471
472    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
473     * the item from manager->handshakes, so this is a little roundabout... */
474    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
475        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
476
477    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
478
479    /* free the torrents. */
480    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
481
482    managerUnlock( manager );
483    tr_free( manager );
484}
485
486static int
487clientIsDownloadingFrom( const tr_peer * peer )
488{
489    return peer->clientIsInterested && !peer->clientIsChoked;
490}
491
492static int
493clientIsUploadingTo( const tr_peer * peer )
494{
495    return peer->peerIsInterested && !peer->peerIsChoked;
496}
497
498/***
499****
500***/
501
502tr_bool
503tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
504                      const uint8_t     * torrentHash,
505                      const tr_address  * addr )
506{
507    tr_bool isSeed = FALSE;
508    const Torrent * t = NULL;
509    const struct peer_atom * atom = NULL;
510
511    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
512    if( t )
513        atom = getExistingAtom( t, addr );
514    if( atom )
515        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
516
517    return isSeed;
518}
519
520/****
521*****
522*****  REFILL
523*****
524****/
525
526static void
527assertValidPiece( Torrent * t, tr_piece_index_t piece )
528{
529    assert( t );
530    assert( t->tor );
531    assert( piece < t->tor->info.pieceCount );
532}
533
534static int
535getPieceRequests( Torrent * t, tr_piece_index_t piece )
536{
537    assertValidPiece( t, piece );
538
539    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
540}
541
542static void
543incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
544{
545    assertValidPiece( t, piece );
546
547    if( t->pendingRequestCount == NULL )
548        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
549    t->pendingRequestCount[piece]++;
550}
551
552static void
553decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
554{
555    assertValidPiece( t, piece );
556
557    if( t->pendingRequestCount )
558        t->pendingRequestCount[piece]--;
559}
560
561struct tr_refill_piece
562{
563    tr_priority_t    priority;
564    uint32_t         piece;
565    uint32_t         peerCount;
566    int              random;
567    int              pendingRequestCount;
568    int              missingBlockCount;
569};
570
571static int
572compareRefillPiece( const void * aIn, const void * bIn )
573{
574    const struct tr_refill_piece * a = aIn;
575    const struct tr_refill_piece * b = bIn;
576
577    /* if one piece has a higher priority, it goes first */
578    if( a->priority != b->priority )
579        return a->priority > b->priority ? -1 : 1;
580
581    /* have a per-priority endgame */
582    if( a->pendingRequestCount != b->pendingRequestCount )
583        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
584
585    /* fewer missing pieces goes first */
586    if( a->missingBlockCount != b->missingBlockCount )
587        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
588
589    /* otherwise if one has fewer peers, it goes first */
590    if( a->peerCount != b->peerCount )
591        return a->peerCount < b->peerCount ? -1 : 1;
592
593    /* otherwise go with our random seed */
594    if( a->random != b->random )
595        return a->random < b->random ? -1 : 1;
596
597    return 0;
598}
599
600static tr_piece_index_t *
601getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
602{
603    const tr_torrent  * tor = t->tor;
604    const tr_info     * inf = &tor->info;
605    tr_piece_index_t    i;
606    tr_piece_index_t    poolSize = 0;
607    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
608    int                 peerCount;
609    const tr_peer    ** peers;
610
611    assert( torrentIsLocked( t ) );
612
613    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
614    peerCount = tr_ptrArraySize( &t->peers );
615
616    /* make a list of the pieces that we want but don't have */
617    for( i = 0; i < inf->pieceCount; ++i )
618        if( !tor->info.pieces[i].dnd
619                && !tr_cpPieceIsComplete( &tor->completion, i ) )
620            pool[poolSize++] = i;
621
622    /* sort the pool by which to request next */
623    if( poolSize > 1 )
624    {
625        tr_piece_index_t j;
626        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
627
628        for( j = 0; j < poolSize; ++j )
629        {
630            int k;
631            const tr_piece_index_t piece = pool[j];
632            struct tr_refill_piece * setme = p + j;
633
634            setme->piece = piece;
635            setme->priority = inf->pieces[piece].priority;
636            setme->peerCount = 0;
637            setme->random = tr_cryptoWeakRandInt( INT_MAX );
638            setme->pendingRequestCount = getPieceRequests( t, piece );
639            setme->missingBlockCount
640                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
641
642            for( k = 0; k < peerCount; ++k )
643            {
644                const tr_peer * peer = peers[k];
645                if( peer->peerIsInterested
646                        && !peer->clientIsChoked
647                        && tr_bitfieldHas( peer->have, piece ) )
648                    ++setme->peerCount;
649            }
650        }
651
652        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
653               compareRefillPiece );
654
655        for( j = 0; j < poolSize; ++j )
656            pool[j] = p[j].piece;
657
658        tr_free( p );
659    }
660
661    *pieceCount = poolSize;
662    return pool;
663}
664
665struct tr_blockIterator
666{
667    Torrent * t;
668    tr_block_index_t blockIndex, blockCount, *blocks;
669    tr_piece_index_t pieceIndex, pieceCount, *pieces;
670};
671
672static struct tr_blockIterator*
673blockIteratorNew( Torrent * t )
674{
675    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
676    i->t = t;
677    i->pieces = getPreferredPieces( t, &i->pieceCount );
678    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
679    return i;
680}
681
682static int
683blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
684{
685    int found;
686    Torrent * t = i->t;
687    tr_torrent * tor = t->tor;
688
689    while( ( i->blockIndex == i->blockCount )
690        && ( i->pieceIndex < i->pieceCount ) )
691    {
692        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
693        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
694        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
695        tr_block_index_t block;
696
697        assert( index < tor->info.pieceCount );
698
699        i->blockCount = 0;
700        i->blockIndex = 0;
701        for( block=b; block!=e; ++block )
702            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
703                i->blocks[i->blockCount++] = block;
704    }
705
706    assert( i->blockCount <= tor->blockCountInPiece );
707
708    if(( found = ( i->blockIndex < i->blockCount )))
709        *setme = i->blocks[i->blockIndex++];
710
711    return found;
712}
713
714static void
715blockIteratorFree( struct tr_blockIterator * i )
716{
717    tr_free( i->blocks );
718    tr_free( i->pieces );
719    tr_free( i );
720}
721
722static tr_peer**
723getPeersUploadingToClient( Torrent * t,
724                           int *     setmeCount )
725{
726    int j;
727    int peerCount = 0;
728    int retCount = 0;
729    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
730    tr_peer ** ret = tr_new( tr_peer *, peerCount );
731
732    j = 0; /* this is a temporary test to make sure we walk through all the peers */
733    if( peerCount )
734    {
735        /* Get a list of peers we're downloading from.
736           Pick a different starting point each time so all peers
737           get a chance at being the first in line */
738        const int fencepost = tr_cryptoWeakRandInt( peerCount );
739        int i = fencepost;
740        do {
741            if( clientIsDownloadingFrom( peers[i] ) )
742                ret[retCount++] = peers[i];
743            i = ( i + 1 ) % peerCount;
744            ++j;
745        } while( i != fencepost );
746    }
747    assert( j == peerCount );
748    *setmeCount = retCount;
749    return ret;
750}
751
752static uint32_t
753getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
754{
755    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
756    const uint64_t blockPos = tor->blockSize * b;
757    assert( blockPos >= piecePos );
758    return (uint32_t)( blockPos - piecePos );
759}
760
761static int
762refillPulse( void * vtorrent )
763{
764    tr_block_index_t block;
765    int peerCount;
766    int webseedCount;
767    tr_peer ** peers;
768    tr_webseed ** webseeds;
769    struct tr_blockIterator * blockIterator;
770    Torrent * t = vtorrent;
771    tr_torrent * tor = t->tor;
772
773    if( !t->isRunning )
774        return TRUE;
775    if( tr_torrentIsSeed( t->tor ) )
776        return TRUE;
777
778    torrentLock( t );
779    tordbg( t, "Refilling Request Buffers..." );
780
781    blockIterator = blockIteratorNew( t );
782    peers = getPeersUploadingToClient( t, &peerCount );
783    webseedCount = tr_ptrArraySize( &t->webseeds );
784    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
785                          webseedCount * sizeof( tr_webseed* ) );
786
787    while( ( webseedCount || peerCount )
788        && blockIteratorNext( blockIterator, &block ) )
789    {
790        int j;
791        int handled = FALSE;
792
793        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
794        const uint32_t offset = getBlockOffsetInPiece( tor, block );
795        const uint32_t length = tr_torBlockCountBytes( tor, block );
796
797        /* find a peer who can ask for this block */
798        for( j=0; !handled && j<peerCount; )
799        {
800            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
801            switch( val )
802            {
803                case TR_ADDREQ_FULL:
804                case TR_ADDREQ_CLIENT_CHOKED:
805                    peers[j] = peers[--peerCount];
806                    break;
807
808                case TR_ADDREQ_MISSING:
809                case TR_ADDREQ_DUPLICATE:
810                    ++j;
811                    break;
812
813                case TR_ADDREQ_OK:
814                    incrementPieceRequests( t, index );
815                    handled = TRUE;
816                    break;
817
818                default:
819                    assert( 0 && "unhandled value" );
820                    break;
821            }
822        }
823
824        /* maybe one of the webseeds can do it */
825        for( j=0; !handled && j<webseedCount; )
826        {
827            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
828            switch( val )
829            {
830                case TR_ADDREQ_FULL:
831                    webseeds[j] = webseeds[--webseedCount];
832                    break;
833
834                case TR_ADDREQ_OK:
835                    incrementPieceRequests( t, index );
836                    handled = TRUE;
837                    break;
838
839                default:
840                    assert( 0 && "unhandled value" );
841                    break;
842            }
843        }
844    }
845
846    /* cleanup */
847    blockIteratorFree( blockIterator );
848    tr_free( webseeds );
849    tr_free( peers );
850
851    t->refillTimer = NULL;
852    torrentUnlock( t );
853    return FALSE;
854}
855
856static void
857broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
858{
859    size_t i;
860    size_t peerCount;
861    tr_peer ** peers;
862
863    assert( torrentIsLocked( t ) );
864
865    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
866
867    peerCount = tr_ptrArraySize( &t->peers );
868    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
869    for( i=0; i<peerCount; ++i )
870        if( peers[i]->msgs )
871            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
872}
873
874static void
875addStrike( Torrent * t,
876           tr_peer * peer )
877{
878    tordbg( t, "increasing peer %s strike count to %d",
879            tr_peerIoAddrStr( &peer->addr,
880                              peer->port ), peer->strikes + 1 );
881
882    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
883    {
884        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
885        atom->myflags |= MYFLAG_BANNED;
886        peer->doPurge = 1;
887        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
888    }
889}
890
891static void
892gotBadPiece( Torrent *        t,
893             tr_piece_index_t pieceIndex )
894{
895    tr_torrent *   tor = t->tor;
896    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
897
898    tor->corruptCur += byteCount;
899    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
900}
901
902static void
903refillSoon( Torrent * t )
904{
905    if( t->refillTimer == NULL )
906        t->refillTimer = tr_timerNew( t->manager->session,
907                                      refillPulse, t,
908                                      REFILL_PERIOD_MSEC );
909}
910
911static void
912peerSuggestedPiece( Torrent            * t UNUSED,
913                    tr_peer            * peer UNUSED,
914                    tr_piece_index_t     pieceIndex UNUSED,
915                    int                  isFastAllowed UNUSED )
916{
917#if 0
918    assert( t );
919    assert( peer );
920    assert( peer->msgs );
921
922    /* is this a valid piece? */
923    if(  pieceIndex >= t->tor->info.pieceCount )
924        return;
925
926    /* don't ask for it if we've already got it */
927    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
928        return;
929
930    /* don't ask for it if they don't have it */
931    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
932        return;
933
934    /* don't ask for it if we're choked and it's not fast */
935    if( !isFastAllowed && peer->clientIsChoked )
936        return;
937
938    /* request the blocks that we don't have in this piece */
939    {
940        tr_block_index_t block;
941        const tr_torrent * tor = t->tor;
942        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
943        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
944
945        for( block=start; block<end; ++block )
946        {
947            if( !tr_cpBlockIsComplete( tor->completion, block ) )
948            {
949                const uint32_t offset = getBlockOffsetInPiece( tor, block );
950                const uint32_t length = tr_torBlockCountBytes( tor, block );
951                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
952                incrementPieceRequests( t, pieceIndex );
953            }
954        }
955    }
956#endif
957}
958
959static void
960peerCallbackFunc( void * vpeer, void * vevent, void * vt )
961{
962    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
963    Torrent * t = vt;
964    const tr_peer_event * e = vevent;
965
966    torrentLock( t );
967
968    switch( e->eventType )
969    {
970        case TR_PEER_UPLOAD_ONLY:
971            /* update our atom */
972            if( peer ) {
973                struct peer_atom * a = getExistingAtom( t, &peer->addr );
974                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
975            }
976            break;
977
978        case TR_PEER_NEED_REQ:
979            refillSoon( t );
980            break;
981
982        case TR_PEER_CANCEL:
983            decrementPieceRequests( t, e->pieceIndex );
984            break;
985
986        case TR_PEER_PEER_GOT_DATA:
987        {
988            const time_t now = time( NULL );
989            tr_torrent * tor = t->tor;
990
991            tor->activityDate = now;
992
993            if( e->wasPieceData )
994                tor->uploadedCur += e->length;
995
996            /* update the stats */
997            if( e->wasPieceData )
998                tr_statsAddUploaded( tor->session, e->length );
999
1000            /* update our atom */
1001            if( peer ) {
1002                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1003                if( e->wasPieceData )
1004                    a->piece_data_time = now;
1005            }
1006
1007            break;
1008        }
1009
1010        case TR_PEER_CLIENT_GOT_SUGGEST:
1011            if( peer )
1012                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1013            break;
1014
1015        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1016            if( peer )
1017                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1018            break;
1019
1020        case TR_PEER_CLIENT_GOT_DATA:
1021        {
1022            const time_t now = time( NULL );
1023            tr_torrent * tor = t->tor;
1024            tor->activityDate = now;
1025
1026            /* only add this to downloadedCur if we got it from a peer --
1027             * webseeds shouldn't count against our ratio.  As one tracker
1028             * admin put it, "Those pieces are downloaded directly from the
1029             * content distributor, not the peers, it is the tracker's job
1030             * to manage the swarms, not the web server and does not fit
1031             * into the jurisdiction of the tracker." */
1032            if( peer && e->wasPieceData )
1033                tor->downloadedCur += e->length;
1034
1035            /* update the stats */ 
1036            if( e->wasPieceData )
1037                tr_statsAddDownloaded( tor->session, e->length );
1038
1039            /* update our atom */
1040            if( peer ) {
1041                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1042                if( e->wasPieceData )
1043                    a->piece_data_time = now;
1044            }
1045
1046            break;
1047        }
1048
1049        case TR_PEER_PEER_PROGRESS:
1050        {
1051            if( peer )
1052            {
1053                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1054                const int peerIsSeed = e->progress >= 1.0;
1055                if( peerIsSeed ) {
1056                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1057                    atom->flags |= ADDED_F_SEED_FLAG;
1058                } else {
1059                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1060                    atom->flags &= ~ADDED_F_SEED_FLAG;
1061                }
1062            }
1063            break;
1064        }
1065
1066        case TR_PEER_CLIENT_GOT_BLOCK:
1067        {
1068            tr_torrent *     tor = t->tor;
1069
1070            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1071
1072            tr_cpBlockAdd( &tor->completion, block );
1073            decrementPieceRequests( t, e->pieceIndex );
1074
1075            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1076
1077            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1078            {
1079                const tr_piece_index_t p = e->pieceIndex;
1080                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1081
1082                if( !ok )
1083                {
1084                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1085                               (unsigned long)p );
1086                }
1087
1088                tr_torrentSetHasPiece( tor, p, ok );
1089                tr_torrentSetPieceChecked( tor, p, TRUE );
1090                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1091
1092                if( !ok )
1093                    gotBadPiece( t, p );
1094                else
1095                {
1096                    int i;
1097                    int peerCount = tr_ptrArraySize( &t->peers );
1098                    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1099                    for( i=0; i<peerCount; ++i )
1100                        tr_peerMsgsHave( peers[i]->msgs, p );
1101                }
1102
1103                tr_torrentRecheckCompleteness( tor );
1104            }
1105            break;
1106        }
1107
1108        case TR_PEER_ERROR:
1109            if( e->err == EINVAL )
1110            {
1111                addStrike( t, peer );
1112                peer->doPurge = 1;
1113                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1114            }
1115            else if( ( e->err == ERANGE )
1116                  || ( e->err == EMSGSIZE )
1117                  || ( e->err == ENOTCONN ) )
1118            {
1119                /* some protocol error from the peer */
1120                peer->doPurge = 1;
1121                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1122            }
1123            else /* a local error, such as an IO error */
1124            {
1125                t->tor->error = e->err;
1126                tr_strlcpy( t->tor->errorString,
1127                            tr_strerror( t->tor->error ),
1128                            sizeof( t->tor->errorString ) );
1129                tr_torrentStop( t->tor );
1130            }
1131            break;
1132
1133        default:
1134            assert( 0 );
1135    }
1136
1137    torrentUnlock( t );
1138}
1139
1140static void
1141ensureAtomExists( Torrent          * t,
1142                  const tr_address * addr,
1143                  tr_port            port,
1144                  uint8_t            flags,
1145                  uint8_t            from )
1146{
1147    if( getExistingAtom( t, addr ) == NULL )
1148    {
1149        struct peer_atom * a;
1150        a = tr_new0( struct peer_atom, 1 );
1151        a->addr = *addr;
1152        a->port = port;
1153        a->flags = flags;
1154        a->from = from;
1155        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1156        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1157    }
1158}
1159
1160static int
1161getMaxPeerCount( const tr_torrent * tor )
1162{
1163    return tor->maxConnectedPeers;
1164}
1165
1166static int
1167getPeerCount( const Torrent * t )
1168{
1169    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1170}
1171
1172/* FIXME: this is kind of a mess. */
1173static tr_bool
1174myHandshakeDoneCB( tr_handshake  * handshake,
1175                   tr_peerIo     * io,
1176                   int             isConnected,
1177                   const uint8_t * peer_id,
1178                   void          * vmanager )
1179{
1180    tr_bool            ok = isConnected;
1181    tr_bool            success = FALSE;
1182    tr_port            port;
1183    const tr_address * addr;
1184    tr_peerMgr       * manager = vmanager;
1185    Torrent          * t;
1186    tr_handshake     * ours;
1187
1188    assert( io );
1189    assert( isConnected == 0 || isConnected == 1 );
1190
1191    t = tr_peerIoHasTorrentHash( io )
1192        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1193        : NULL;
1194
1195    if( tr_peerIoIsIncoming ( io ) )
1196        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1197                                        handshake, handshakeCompare );
1198    else if( t )
1199        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1200                                        handshake, handshakeCompare );
1201    else
1202        ours = handshake;
1203
1204    assert( ours );
1205    assert( ours == handshake );
1206
1207    if( t )
1208        torrentLock( t );
1209
1210    addr = tr_peerIoGetAddress( io, &port );
1211
1212    if( !ok || !t || !t->isRunning )
1213    {
1214        if( t )
1215        {
1216            struct peer_atom * atom = getExistingAtom( t, addr );
1217            if( atom )
1218                ++atom->numFails;
1219        }
1220    }
1221    else /* looking good */
1222    {
1223        struct peer_atom * atom;
1224        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1225        atom = getExistingAtom( t, addr );
1226        atom->time = time( NULL );
1227        atom->piece_data_time = 0;
1228
1229        if( atom->myflags & MYFLAG_BANNED )
1230        {
1231            tordbg( t, "banned peer %s tried to reconnect",
1232                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1233        }
1234        else if( tr_peerIoIsIncoming( io )
1235               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1236
1237        {
1238        }
1239        else
1240        {
1241            tr_peer * peer = getExistingPeer( t, addr );
1242
1243            if( peer ) /* we already have this peer */
1244            {
1245            }
1246            else
1247            {
1248                peer = getPeer( t, addr );
1249                tr_free( peer->client );
1250
1251                if( !peer_id )
1252                    peer->client = NULL;
1253                else {
1254                    char client[128];
1255                    tr_clientForId( client, sizeof( client ), peer_id );
1256                    peer->client = tr_strdup( client );
1257                }
1258
1259                peer->port = port;
1260                peer->io = tr_handshakeStealIO( handshake );
1261                tr_peerIoRef( peer->io ); /* balanced by the unref in peerDestructor() */
1262                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1263                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1264
1265                success = TRUE;
1266            }
1267        }
1268    }
1269
1270    if( !success )
1271        tr_handshakeFree( handshake );
1272
1273    if( t )
1274        torrentUnlock( t );
1275
1276    return success;
1277}
1278
1279void
1280tr_peerMgrAddIncoming( tr_peerMgr * manager,
1281                       tr_address * addr,
1282                       tr_port      port,
1283                       int          socket )
1284{
1285    managerLock( manager );
1286
1287    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1288    {
1289        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1290        tr_netClose( socket );
1291    }
1292    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1293    {
1294        tr_netClose( socket );
1295    }
1296    else /* we don't have a connetion to them yet... */
1297    {
1298        tr_peerIo *    io;
1299        tr_handshake * handshake;
1300
1301        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1302
1303        handshake = tr_handshakeNew( io,
1304                                     manager->session->encryptionMode,
1305                                     myHandshakeDoneCB,
1306                                     manager );
1307
1308        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1309
1310        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1311                                 handshakeCompare );
1312    }
1313
1314    managerUnlock( manager );
1315}
1316
1317static tr_bool
1318tr_isPex( const tr_pex * pex )
1319{
1320    return pex && tr_isAddress( &pex->addr );
1321}
1322
1323void
1324tr_peerMgrAddPex( tr_peerMgr *    manager,
1325                  const uint8_t * torrentHash,
1326                  uint8_t         from,
1327                  const tr_pex *  pex )
1328{
1329    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1330    {
1331        Torrent * t;
1332        managerLock( manager );
1333
1334        t = getExistingTorrent( manager, torrentHash );
1335        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1336            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1337
1338        managerUnlock( manager );
1339    }
1340}
1341
1342tr_pex *
1343tr_peerMgrCompactToPex( const void *    compact,
1344                        size_t          compactLen,
1345                        const uint8_t * added_f,
1346                        size_t          added_f_len,
1347                        size_t *        pexCount )
1348{
1349    size_t          i;
1350    size_t          n = compactLen / 6;
1351    const uint8_t * walk = compact;
1352    tr_pex *        pex = tr_new0( tr_pex, n );
1353
1354    for( i = 0; i < n; ++i )
1355    {
1356        pex[i].addr.type = TR_AF_INET;
1357        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1358        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1359        if( added_f && ( n == added_f_len ) )
1360            pex[i].flags = added_f[i];
1361    }
1362
1363    *pexCount = n;
1364    return pex;
1365}
1366
1367tr_pex *
1368tr_peerMgrCompact6ToPex( const void    * compact,
1369                         size_t          compactLen,
1370                         const uint8_t * added_f,
1371                         size_t          added_f_len,
1372                         size_t        * pexCount )
1373{
1374    size_t          i;
1375    size_t          n = compactLen / 18;
1376    const uint8_t * walk = compact;
1377    tr_pex *        pex = tr_new0( tr_pex, n );
1378   
1379    for( i = 0; i < n; ++i )
1380    {
1381        pex[i].addr.type = TR_AF_INET6;
1382        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1383        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1384        if( added_f && ( n == added_f_len ) )
1385            pex[i].flags = added_f[i];
1386    }
1387   
1388    *pexCount = n;
1389    return pex;
1390}
1391
1392tr_pex *
1393tr_peerMgrArrayToPex( const void * array,
1394                      size_t       arrayLen,
1395                      size_t      * pexCount )
1396{
1397    size_t          i;
1398    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1399    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1400    const uint8_t * walk = array;
1401    tr_pex        * pex = tr_new0( tr_pex, n );
1402   
1403    for( i = 0 ; i < n ; i++ ) {
1404        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1405        tr_suspectAddress( &pex[i].addr, "tracker"  );
1406        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1407        pex[i].flags = 0x00;
1408        walk += sizeof( tr_address ) + 2;
1409    }
1410   
1411    *pexCount = n;
1412    return pex;
1413}
1414
1415/**
1416***
1417**/
1418
1419void
1420tr_peerMgrSetBlame( tr_peerMgr *     manager,
1421                    const uint8_t *  torrentHash,
1422                    tr_piece_index_t pieceIndex,
1423                    int              success )
1424{
1425    if( !success )
1426    {
1427        int        peerCount, i;
1428        Torrent *  t = getExistingTorrent( manager, torrentHash );
1429        tr_peer ** peers;
1430
1431        assert( torrentIsLocked( t ) );
1432
1433        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1434        for( i = 0; i < peerCount; ++i )
1435        {
1436            tr_peer * peer = peers[i];
1437            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1438            {
1439                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1440                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1441                        pieceIndex, (int)peer->strikes + 1 );
1442                addStrike( t, peer );
1443            }
1444        }
1445    }
1446}
1447
1448int
1449tr_pexCompare( const void * va, const void * vb )
1450{
1451    const tr_pex * a = va;
1452    const tr_pex * b = vb;
1453    int i;
1454
1455    assert( tr_isPex( a ) );
1456    assert( tr_isPex( b ) );
1457
1458    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1459        return i;
1460
1461    if( a->port != b->port )
1462        return a->port < b->port ? -1 : 1;
1463
1464    return 0;
1465}
1466
1467static int
1468peerPrefersCrypto( const tr_peer * peer )
1469{
1470    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1471        return TRUE;
1472
1473    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1474        return FALSE;
1475
1476    return tr_peerIoIsEncrypted( peer->io );
1477}
1478
1479int
1480tr_peerMgrGetPeers( tr_peerMgr      * manager,
1481                    const uint8_t   * torrentHash,
1482                    tr_pex         ** setme_pex,
1483                    uint8_t           af)
1484{
1485    int peersReturning = 0;
1486    const Torrent *  t;
1487
1488    managerLock( manager );
1489
1490    t = getExistingTorrent( manager, torrentHash );
1491    if( t == NULL )
1492    {
1493        *setme_pex = NULL;
1494    }
1495    else
1496    {
1497        int i;
1498        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1499        const int peerCount = tr_ptrArraySize( &t->peers );
1500        /* for now, this will waste memory on torrents that have both
1501         * ipv6 and ipv4 peers */
1502        tr_pex * pex = tr_new( tr_pex, peerCount );
1503        tr_pex * walk = pex;
1504
1505        for( i=0; i<peerCount; ++i )
1506        {
1507            const tr_peer * peer = peers[i];
1508            if( peer->addr.type == af )
1509            {
1510                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1511
1512                assert( tr_isAddress( &peer->addr ) );
1513                walk->addr = peer->addr;
1514                walk->port = peer->port;
1515                walk->flags = 0;
1516                if( peerPrefersCrypto( peer ) )
1517                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1518                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1519                    walk->flags |= ADDED_F_SEED_FLAG;
1520                ++peersReturning;
1521                ++walk;
1522            }
1523        }
1524
1525        assert( ( walk - pex ) == peersReturning );
1526        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1527        *setme_pex = pex;
1528    }
1529
1530    managerUnlock( manager );
1531    return peersReturning;
1532}
1533
1534static int reconnectPulse( void * vtorrent );
1535
1536static int rechokePulse( void * vtorrent );
1537
1538void
1539tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1540                        const uint8_t * torrentHash )
1541{
1542    Torrent * t;
1543
1544    managerLock( manager );
1545
1546    t = getExistingTorrent( manager, torrentHash );
1547
1548    assert( t );
1549    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1550    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1551
1552    if( !t->isRunning )
1553    {
1554        t->isRunning = 1;
1555
1556        t->reconnectTimer = tr_timerNew( t->manager->session,
1557                                         reconnectPulse, t,
1558                                         RECONNECT_PERIOD_MSEC );
1559
1560        t->rechokeTimer = tr_timerNew( t->manager->session,
1561                                       rechokePulse, t,
1562                                       RECHOKE_PERIOD_MSEC );
1563
1564        reconnectPulse( t );
1565
1566        rechokePulse( t );
1567
1568        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1569            refillSoon( t );
1570    }
1571
1572    managerUnlock( manager );
1573}
1574
1575static void
1576stopTorrent( Torrent * t )
1577{
1578    assert( torrentIsLocked( t ) );
1579
1580    t->isRunning = 0;
1581    tr_timerFree( &t->rechokeTimer );
1582    tr_timerFree( &t->reconnectTimer );
1583
1584    /* disconnect the peers. */
1585    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1586    tr_ptrArrayClear( &t->peers );
1587
1588    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1589     * which removes the handshake from t->outgoingHandshakes... */
1590    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1591        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1592}
1593
1594void
1595tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1596                       const uint8_t * torrentHash )
1597{
1598    managerLock( manager );
1599
1600    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1601
1602    managerUnlock( manager );
1603}
1604
1605void
1606tr_peerMgrAddTorrent( tr_peerMgr * manager,
1607                      tr_torrent * tor )
1608{
1609    Torrent * t;
1610
1611    managerLock( manager );
1612
1613    assert( tor );
1614    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1615
1616    t = torrentConstructor( manager, tor );
1617    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1618
1619    managerUnlock( manager );
1620}
1621
1622void
1623tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1624                         const uint8_t * torrentHash )
1625{
1626    Torrent * t;
1627
1628    managerLock( manager );
1629
1630    t = getExistingTorrent( manager, torrentHash );
1631    assert( t );
1632    stopTorrent( t );
1633    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1634    torrentDestructor( t );
1635
1636    managerUnlock( manager );
1637}
1638
1639void
1640tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1641                               const uint8_t *    torrentHash,
1642                               int8_t *           tab,
1643                               unsigned int       tabCount )
1644{
1645    tr_piece_index_t   i;
1646    const Torrent *    t;
1647    const tr_torrent * tor;
1648    float              interval;
1649    tr_bool            isSeed;
1650    int                peerCount;
1651    const tr_peer **   peers;
1652    managerLock( manager );
1653
1654    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1655    tor = t->tor;
1656    interval = tor->info.pieceCount / (float)tabCount;
1657    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1658    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1659    peerCount = tr_ptrArraySize( &t->peers );
1660
1661    memset( tab, 0, tabCount );
1662
1663    for( i = 0; tor && i < tabCount; ++i )
1664    {
1665        const int piece = i * interval;
1666
1667        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1668            tab[i] = -1;
1669        else if( peerCount ) {
1670            int j;
1671            for( j = 0; j < peerCount; ++j )
1672                if( tr_bitfieldHas( peers[j]->have, i ) )
1673                    ++tab[i];
1674        }
1675    }
1676
1677    managerUnlock( manager );
1678}
1679
1680/* Returns the pieces that are available from peers */
1681tr_bitfield*
1682tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1683                        const uint8_t *    torrentHash )
1684{
1685    int i;
1686    int peerCount;
1687    Torrent * t;
1688    const tr_peer ** peers;
1689    tr_bitfield * pieces;
1690    managerLock( manager );
1691
1692    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1693    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1694    peerCount = tr_ptrArraySize( &t->peers );
1695    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1696    for( i=0; i<peerCount; ++i )
1697        tr_bitfieldOr( pieces, peers[i]->have );
1698
1699    managerUnlock( manager );
1700    return pieces;
1701}
1702
1703int
1704tr_peerMgrHasConnections( const tr_peerMgr * manager,
1705                          const uint8_t *    torrentHash )
1706{
1707    int ret;
1708    const Torrent * t;
1709    managerLock( manager );
1710
1711    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1712    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1713
1714    managerUnlock( manager );
1715    return ret;
1716}
1717
1718void
1719tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1720                        const uint8_t    * torrentHash,
1721                        int              * setmePeersKnown,
1722                        int              * setmePeersConnected,
1723                        int              * setmeSeedsConnected,
1724                        int              * setmeWebseedsSendingToUs,
1725                        int              * setmePeersSendingToUs,
1726                        int              * setmePeersGettingFromUs,
1727                        int              * setmePeersFrom )
1728{
1729    int i, size;
1730    const Torrent * t;
1731    const tr_peer ** peers;
1732    const tr_webseed ** webseeds;
1733
1734    managerLock( manager );
1735
1736    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1737    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1738    size = tr_ptrArraySize( &t->peers );
1739
1740    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1741    *setmePeersConnected       = 0;
1742    *setmeSeedsConnected       = 0;
1743    *setmePeersGettingFromUs   = 0;
1744    *setmePeersSendingToUs     = 0;
1745    *setmeWebseedsSendingToUs  = 0;
1746
1747    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1748        setmePeersFrom[i] = 0;
1749
1750    for( i=0; i<size; ++i )
1751    {
1752        const tr_peer * peer = peers[i];
1753        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1754
1755        if( peer->io == NULL ) /* not connected */
1756            continue;
1757
1758        ++*setmePeersConnected;
1759
1760        ++setmePeersFrom[atom->from];
1761
1762        if( clientIsDownloadingFrom( peer ) )
1763            ++*setmePeersSendingToUs;
1764
1765        if( clientIsUploadingTo( peer ) )
1766            ++*setmePeersGettingFromUs;
1767
1768        if( atom->flags & ADDED_F_SEED_FLAG )
1769            ++*setmeSeedsConnected;
1770    }
1771
1772    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1773    size = tr_ptrArraySize( &t->webseeds );
1774    for( i=0; i<size; ++i )
1775        if( tr_webseedIsActive( webseeds[i] ) )
1776            ++*setmeWebseedsSendingToUs;
1777
1778    managerUnlock( manager );
1779}
1780
1781float*
1782tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1783                     const uint8_t *    torrentHash )
1784{
1785    const Torrent * t;
1786    const tr_webseed ** webseeds;
1787    int i;
1788    int webseedCount;
1789    float * ret;
1790    uint64_t now;
1791
1792    assert( manager );
1793    managerLock( manager );
1794
1795    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1796    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1797    webseedCount = tr_ptrArraySize( &t->webseeds );
1798    assert( webseedCount == t->tor->info.webseedCount );
1799    ret = tr_new0( float, webseedCount );
1800    now = tr_date( );
1801
1802    for( i=0; i<webseedCount; ++i )
1803        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1804            ret[i] = -1.0;
1805
1806    managerUnlock( manager );
1807    return ret;
1808}
1809
1810double
1811tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1812{
1813    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1814}
1815
1816
1817struct tr_peer_stat *
1818tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1819                     const   uint8_t     * torrentHash,
1820                     int                 * setmeCount )
1821{
1822    int i, size;
1823    const Torrent * t;
1824    const tr_peer ** peers;
1825    tr_peer_stat * ret;
1826    uint64_t now;
1827
1828    assert( manager );
1829    managerLock( manager );
1830
1831    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1832    size = tr_ptrArraySize( &t->peers );
1833    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1834    ret = tr_new0( tr_peer_stat, size );
1835    now = tr_date( );
1836
1837    for( i = 0; i < size; ++i )
1838    {
1839        char *                   pch;
1840        const tr_peer *          peer = peers[i];
1841        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1842        tr_peer_stat *           stat = ret + i;
1843        tr_address               norm_addr;
1844
1845        norm_addr = peer->addr;
1846        tr_normalizeV4Mapped( &norm_addr );
1847        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1848        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1849                   sizeof( stat->client ) );
1850        stat->port               = ntohs( peer->port );
1851        stat->from               = atom->from;
1852        stat->progress           = peer->progress;
1853        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1854        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1855        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1856        stat->peerIsChoked       = peer->peerIsChoked;
1857        stat->peerIsInterested   = peer->peerIsInterested;
1858        stat->clientIsChoked     = peer->clientIsChoked;
1859        stat->clientIsInterested = peer->clientIsInterested;
1860        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1861        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1862        stat->isUploadingTo      = clientIsUploadingTo( peer );
1863        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1864
1865        pch = stat->flagStr;
1866        if( t->optimistic == peer ) *pch++ = 'O';
1867        if( stat->isDownloadingFrom ) *pch++ = 'D';
1868        else if( stat->clientIsInterested ) *pch++ = 'd';
1869        if( stat->isUploadingTo ) *pch++ = 'U';
1870        else if( stat->peerIsInterested ) *pch++ = 'u';
1871        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1872        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1873        if( stat->isEncrypted ) *pch++ = 'E';
1874        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1875        if( stat->isIncoming ) *pch++ = 'I';
1876        *pch = '\0';
1877    }
1878
1879    *setmeCount = size;
1880
1881    managerUnlock( manager );
1882    return ret;
1883}
1884
1885/**
1886***
1887**/
1888
1889struct ChokeData
1890{
1891    tr_bool         doUnchoke;
1892    tr_bool         isInterested;
1893    tr_bool         isChoked;
1894    int             rate;
1895    tr_peer *       peer;
1896};
1897
1898static int
1899compareChoke( const void * va,
1900              const void * vb )
1901{
1902    const struct ChokeData * a = va;
1903    const struct ChokeData * b = vb;
1904
1905    if( a->rate != b->rate ) /* prefer higher overall speeds */
1906        return a->rate > b->rate ? -1 : 1;
1907
1908    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1909        return a->isChoked ? 1 : -1;
1910
1911    return 0;
1912}
1913
1914static int
1915isNew( const tr_peer * peer )
1916{
1917    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1918}
1919
1920static int
1921isSame( const tr_peer * peer )
1922{
1923    return peer && peer->client && strstr( peer->client, "Transmission" );
1924}
1925
1926/**
1927***
1928**/
1929
1930static void
1931rechoke( Torrent * t )
1932{
1933    int i, size, unchokedInterested;
1934    const int peerCount = tr_ptrArraySize( &t->peers );
1935    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1936    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1937    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1938    const uint64_t now = tr_date( );
1939
1940    assert( torrentIsLocked( t ) );
1941
1942    /* sort the peers by preference and rate */
1943    for( i = 0, size = 0; i < peerCount; ++i )
1944    {
1945        tr_peer * peer = peers[i];
1946        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1947
1948        if( peer->progress >= 1.0 ) /* choke all seeds */
1949        {
1950            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1951        }
1952        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1953        {
1954            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1955        }
1956        else if( chokeAll ) /* choke everyone if we're not uploading */
1957        {
1958            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1959        }
1960        else
1961        {
1962            struct ChokeData * n = &choke[size++];
1963            n->peer         = peer;
1964            n->isInterested = peer->peerIsInterested;
1965            n->isChoked     = peer->peerIsChoked;
1966            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1967        }
1968    }
1969
1970    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1971
1972    /**
1973     * Reciprocation and number of uploads capping is managed by unchoking
1974     * the N peers which have the best upload rate and are interested.
1975     * This maximizes the client's download rate. These N peers are
1976     * referred to as downloaders, because they are interested in downloading
1977     * from the client.
1978     *
1979     * Peers which have a better upload rate (as compared to the downloaders)
1980     * but aren't interested get unchoked. If they become interested, the
1981     * downloader with the worst upload rate gets choked. If a client has
1982     * a complete file, it uses its upload rate rather than its download
1983     * rate to decide which peers to unchoke.
1984     */
1985    unchokedInterested = 0;
1986    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1987        choke[i].doUnchoke = 1;
1988        if( choke[i].isInterested )
1989            ++unchokedInterested;
1990    }
1991
1992    /* optimistic unchoke */
1993    if( i < size )
1994    {
1995        int n;
1996        struct ChokeData * c;
1997        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1998
1999        for( ; i<size; ++i )
2000        {
2001            if( choke[i].isInterested )
2002            {
2003                const tr_peer * peer = choke[i].peer;
2004                int x = 1, y;
2005                if( isNew( peer ) ) x *= 3;
2006                if( isSame( peer ) ) x *= 3;
2007                for( y=0; y<x; ++y )
2008                    tr_ptrArrayAppend( &randPool, &choke[i] );
2009            }
2010        }
2011
2012        if(( n = tr_ptrArraySize( &randPool )))
2013        {
2014            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2015            c->doUnchoke = 1;
2016            t->optimistic = c->peer;
2017        }
2018
2019        tr_ptrArrayDestruct( &randPool, NULL );
2020    }
2021
2022    for( i=0; i<size; ++i )
2023        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2024
2025    /* cleanup */
2026    tr_free( choke );
2027}
2028
2029static int
2030rechokePulse( void * vtorrent )
2031{
2032    Torrent * t = vtorrent;
2033
2034    torrentLock( t );
2035    rechoke( t );
2036    torrentUnlock( t );
2037    return TRUE;
2038}
2039
2040/***
2041****
2042****  Life and Death
2043****
2044***/
2045
2046static int
2047shouldPeerBeClosed( const Torrent * t,
2048                    const tr_peer * peer,
2049                    int             peerCount )
2050{
2051    const tr_torrent *       tor = t->tor;
2052    const time_t             now = time( NULL );
2053    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2054
2055    /* if it's marked for purging, close it */
2056    if( peer->doPurge )
2057    {
2058        tordbg( t, "purging peer %s because its doPurge flag is set",
2059                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2060        return TRUE;
2061    }
2062
2063    /* if we're seeding and the peer has everything we have,
2064     * and enough time has passed for a pex exchange, then disconnect */
2065    if( tr_torrentIsSeed( tor ) )
2066    {
2067        int peerHasEverything;
2068        if( atom->flags & ADDED_F_SEED_FLAG )
2069            peerHasEverything = TRUE;
2070        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2071            peerHasEverything = FALSE;
2072        else {
2073            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2074            tr_bitfieldDifference( tmp, peer->have );
2075            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2076            tr_bitfieldFree( tmp );
2077        }
2078
2079        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2080        {
2081            tordbg( t, "purging peer %s because we're both seeds",
2082                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2083            return TRUE;
2084        }
2085    }
2086
2087    /* disconnect if it's been too long since piece data has been transferred.
2088     * this is on a sliding scale based on number of available peers... */
2089    {
2090        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2091        /* if we have >= relaxIfFewerThan, strictness is 100%.
2092         * if we have zero connections, strictness is 0% */
2093        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2094            ? 1.0
2095            : peerCount / (float)relaxStrictnessIfFewerThanN;
2096        const int lo = MIN_UPLOAD_IDLE_SECS;
2097        const int hi = MAX_UPLOAD_IDLE_SECS;
2098        const int limit = hi - ( ( hi - lo ) * strictness );
2099        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2100/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2101        if( idleTime > limit ) {
2102            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2103                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2104            return TRUE;
2105        }
2106    }
2107
2108    return FALSE;
2109}
2110
2111static tr_peer **
2112getPeersToClose( Torrent * t, int * setmeSize )
2113{
2114    int i, peerCount, outsize;
2115    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2116    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2117
2118    assert( torrentIsLocked( t ) );
2119
2120    for( i = outsize = 0; i < peerCount; ++i )
2121        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2122            ret[outsize++] = peers[i];
2123
2124    *setmeSize = outsize;
2125    return ret;
2126}
2127
2128static int
2129compareCandidates( const void * va,
2130                   const void * vb )
2131{
2132    const struct peer_atom * a = *(const struct peer_atom**) va;
2133    const struct peer_atom * b = *(const struct peer_atom**) vb;
2134
2135    /* <Charles> Here we would probably want to try reconnecting to
2136     * peers that had most recently given us data. Lots of users have
2137     * trouble with resets due to their routers and/or ISPs. This way we
2138     * can quickly recover from an unwanted reset. So we sort
2139     * piece_data_time in descending order.
2140     */
2141
2142    if( a->piece_data_time != b->piece_data_time )
2143        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2144
2145    if( a->numFails != b->numFails )
2146        return a->numFails < b->numFails ? -1 : 1;
2147
2148    if( a->time != b->time )
2149        return a->time < b->time ? -1 : 1;
2150
2151    /* all other things being equal, prefer peers whose
2152     * information comes from a more reliable source */
2153    if( a->from != b->from )
2154        return a->from < b->from ? -1 : 1;
2155
2156    return 0;
2157}
2158
2159static int
2160getReconnectIntervalSecs( const struct peer_atom * atom )
2161{
2162    int          sec;
2163    const time_t now = time( NULL );
2164
2165    /* if we were recently connected to this peer and transferring piece
2166     * data, try to reconnect to them sooner rather that later -- we don't
2167     * want network troubles to get in the way of a good peer. */
2168    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2169        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2170
2171    /* don't allow reconnects more often than our minimum */
2172    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2173        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2174
2175    /* otherwise, the interval depends on how many times we've tried
2176     * and failed to connect to the peer */
2177    else switch( atom->numFails ) {
2178        case 0: sec = 0; break;
2179        case 1: sec = 5; break;
2180        case 2: sec = 2 * 60; break;
2181        case 3: sec = 15 * 60; break;
2182        case 4: sec = 30 * 60; break;
2183        case 5: sec = 60 * 60; break;
2184        default: sec = 120 * 60; break;
2185    }
2186
2187    return sec;
2188}
2189
2190static struct peer_atom **
2191getPeerCandidates( Torrent * t, int * setmeSize )
2192{
2193    int                 i, atomCount, retCount;
2194    struct peer_atom ** atoms;
2195    struct peer_atom ** ret;
2196    const time_t        now = time( NULL );
2197    const int           seed = tr_torrentIsSeed( t->tor );
2198
2199    assert( torrentIsLocked( t ) );
2200
2201    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2202    ret = tr_new( struct peer_atom*, atomCount );
2203    for( i = retCount = 0; i < atomCount; ++i )
2204    {
2205        int                interval;
2206        struct peer_atom * atom = atoms[i];
2207
2208        /* peer fed us too much bad data ... we only keep it around
2209         * now to weed it out in case someone sends it to us via pex */
2210        if( atom->myflags & MYFLAG_BANNED )
2211            continue;
2212
2213        /* peer was unconnectable before, so we're not going to keep trying.
2214         * this is needs a separate flag from `banned', since if they try
2215         * to connect to us later, we'll let them in */
2216        if( atom->myflags & MYFLAG_UNREACHABLE )
2217            continue;
2218
2219        /* we don't need two connections to the same peer... */
2220        if( peerIsInUse( t, &atom->addr ) )
2221            continue;
2222
2223        /* no need to connect if we're both seeds... */
2224        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2225                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2226            continue;
2227
2228        /* don't reconnect too often */
2229        interval = getReconnectIntervalSecs( atom );
2230        if( ( now - atom->time ) < interval )
2231        {
2232            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2233                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2234            continue;
2235        }
2236
2237        /* Don't connect to peers in our blocklist */
2238        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2239            continue;
2240
2241        ret[retCount++] = atom;
2242    }
2243
2244    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2245    *setmeSize = retCount;
2246    return ret;
2247}
2248
2249static int
2250reconnectPulse( void * vtorrent )
2251{
2252    Torrent *     t = vtorrent;
2253    static time_t prevTime = 0;
2254    static int    newConnectionsThisSecond = 0;
2255    time_t        now;
2256
2257    torrentLock( t );
2258
2259    now = time( NULL );
2260    if( prevTime != now )
2261    {
2262        prevTime = now;
2263        newConnectionsThisSecond = 0;
2264    }
2265
2266    if( !t->isRunning )
2267    {
2268        removeAllPeers( t );
2269    }
2270    else
2271    {
2272        int i, nCandidates, nBad;
2273        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2274        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2275
2276        //if( nBad || nCandidates )
2277            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2278                    "%d connection candidates, %d atoms, max per pulse is %d",
2279                    t->tor->info.name, nBad, nCandidates,
2280                    tr_ptrArraySize( &t->pool ),
2281                    (int)MAX_RECONNECTIONS_PER_PULSE );
2282
2283        /* disconnect some peers.
2284           if we transferred piece data, then they might be good peers,
2285           so reset their `numFails' weight to zero.  otherwise we connected
2286           to them fruitlessly, so mark it as another fail */
2287        for( i = 0; i < nBad; ++i ) {
2288            tr_peer * peer = connections[i];
2289            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2290            if( atom->piece_data_time )
2291                atom->numFails = 0;
2292            else
2293                ++atom->numFails;
2294            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2295            removePeer( t, peer );
2296        }
2297
2298tordbg( t, "nCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, getPeerCount(t) is %d, getMaxPeerCount(t) is %d, newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2299        (int)nCandidates, (int)MAX_RECONNECTIONS_PER_PULSE, (int)getPeerCount( t ), (int)getMaxPeerCount( t->tor ), (int)newConnectionsThisSecond, (int)MAX_CONNECTIONS_PER_SECOND );
2300
2301        /* add some new ones */
2302        for( i = 0;    ( i < nCandidates )
2303           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2304           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2305           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2306        {
2307            tr_peerMgr *       mgr = t->manager;
2308            struct peer_atom * atom = candidates[i];
2309            tr_peerIo *        io;
2310
2311            tordbg( t, "Starting an OUTGOING connection with %s",
2312                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2313
2314            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2315
2316            if( io == NULL )
2317            {
2318                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2319                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2320                atom->myflags |= MYFLAG_UNREACHABLE;
2321            }
2322            else
2323            {
2324                tr_handshake * handshake = tr_handshakeNew( io,
2325                                                            mgr->session->encryptionMode,
2326                                                            myHandshakeDoneCB,
2327                                                            mgr );
2328
2329                assert( tr_peerIoGetTorrentHash( io ) );
2330
2331                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2332
2333                ++newConnectionsThisSecond;
2334
2335                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2336                                         handshakeCompare );
2337            }
2338
2339            atom->time = time( NULL );
2340        }
2341
2342        /* cleanup */
2343        tr_free( connections );
2344        tr_free( candidates );
2345    }
2346
2347    torrentUnlock( t );
2348    return TRUE;
2349}
2350
2351/****
2352*****
2353*****  BANDWIDTH ALLOCATION
2354*****
2355****/
2356
2357static void
2358pumpAllPeers( tr_peerMgr * mgr )
2359{
2360    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2361    int       i, j;
2362
2363    for( i=0; i<torrentCount; ++i )
2364    {
2365        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2366        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2367        {
2368            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2369            tr_peerMsgsPulse( peer->msgs );
2370        }
2371    }
2372}
2373
2374static int
2375bandwidthPulse( void * vmgr )
2376{
2377    tr_peerMgr * mgr = vmgr;
2378    managerLock( mgr );
2379
2380    /* FIXME: this next line probably isn't necessary... */
2381    pumpAllPeers( mgr );
2382
2383    /* allocate bandwidth to the peers */
2384    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2385    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2386
2387    managerUnlock( mgr );
2388    return TRUE;
2389}
Note: See TracBrowser for help on using the repository browser.