source: trunk/libtransmission/peer-mgr.c @ 7623

Last change on this file since 7623 was 7623, checked in by charles, 12 years ago

(trunk libT) two bugfixes in one: (1) fix leaking tr_handshake objects reported by BentMyWookie? (2) fix yet another permutation of the tr_isBandwidth() assertion failure -- maybe the last one? -- reported by Waldorf

  • Property svn:keywords set to Date Rev Author Id
File size: 68.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7623 2009-01-06 00:24:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 16,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 32,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static inline void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static inline void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static inline void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static inline void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static inline int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va, const void * vb )
202{
203    const tr_handshake * a = va;
204
205    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
206}
207
208static int
209handshakeCompare( const void * a, const void * b )
210{
211    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
212}
213
214static tr_handshake*
215getExistingHandshake( tr_ptrArray      * handshakes,
216                      const tr_address * addr )
217{
218    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va, const void * vb )
223{
224    const struct peer_atom * a = va;
225
226    return tr_compareAddresses( &a->addr, vb );
227}
228
229static int
230comparePeerAtoms( const void * va, const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va, const void * vb )
272{
273    const tr_peer * a = va;
274    const tr_peer * b = vb;
275
276    return tr_compareAddresses( &a->addr, &b->addr );
277}
278
279static int
280peerCompareToAddr( const void * va, const void * vb )
281{
282    const tr_peer * a = va;
283
284    return tr_compareAddresses( &a->addr, vb );
285}
286
287static tr_peer*
288getExistingPeer( Torrent          * torrent,
289                 const tr_address * addr )
290{
291    assert( torrentIsLocked( torrent ) );
292    assert( addr );
293
294    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const Torrent    * t,
299                 const tr_address * addr )
300{
301    Torrent * tt = (Torrent*)t;
302    assert( torrentIsLocked( t ) );
303    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
304}
305
306static tr_bool
307peerIsInUse( const Torrent    * ct,
308             const tr_address * addr )
309{
310    Torrent * t = (Torrent*) ct;
311
312    assert( torrentIsLocked ( t ) );
313
314    return getExistingPeer( t, addr )
315        || getExistingHandshake( &t->outgoingHandshakes, addr )
316        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
317}
318
319static tr_peer*
320peerConstructor( const tr_address * addr )
321{
322    tr_peer * p;
323    p = tr_new0( tr_peer, 1 );
324    p->addr = *addr;
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent          * torrent,
330         const tr_address * addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( addr );
341        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351
352    if( peer->msgs != NULL )
353    {
354        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
355        tr_peerMsgsFree( peer->msgs );
356    }
357
358    tr_peerIoClear( peer->io );
359    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
360
361    tr_bitfieldFree( peer->have );
362    tr_bitfieldFree( peer->blame );
363    tr_free( peer->client );
364
365    tr_free( peer );
366}
367
368static void
369removePeer( Torrent * t,
370            tr_peer * peer )
371{
372    tr_peer *          removed;
373    struct peer_atom * atom;
374
375    assert( torrentIsLocked( t ) );
376
377    atom = getExistingAtom( t, &peer->addr );
378    assert( atom );
379    atom->time = time( NULL );
380
381    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
382    assert( removed == peer );
383    peerDestructor( removed );
384}
385
386static void
387removeAllPeers( Torrent * t )
388{
389    while( !tr_ptrArrayEmpty( &t->peers ) )
390        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
391}
392
393static void
394torrentDestructor( void * vt )
395{
396    Torrent * t = vt;
397    uint8_t   hash[SHA_DIGEST_LENGTH];
398
399    assert( t );
400    assert( !t->isRunning );
401    assert( torrentIsLocked( t ) );
402    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
403    assert( tr_ptrArrayEmpty( &t->peers ) );
404
405    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
406
407    tr_timerFree( &t->reconnectTimer );
408    tr_timerFree( &t->rechokeTimer );
409    tr_timerFree( &t->refillTimer );
410
411    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
412    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
413    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
414    tr_ptrArrayDestruct( &t->peers, NULL );
415
416    tr_free( t->pendingRequestCount );
417    tr_free( t );
418}
419
420static void peerCallbackFunc( void * vpeer,
421                              void * vevent,
422                              void * vt );
423
424static Torrent*
425torrentConstructor( tr_peerMgr * manager,
426                    tr_torrent * tor )
427{
428    int       i;
429    Torrent * t;
430
431    t = tr_new0( Torrent, 1 );
432    t->manager = manager;
433    t->tor = tor;
434    t->pool = TR_PTR_ARRAY_INIT;
435    t->peers = TR_PTR_ARRAY_INIT;
436    t->webseeds = TR_PTR_ARRAY_INIT;
437    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
438    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
439
440    for( i = 0; i < tor->info.webseedCount; ++i )
441    {
442        tr_webseed * w =
443            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
444        tr_ptrArrayAppend( &t->webseeds, w );
445    }
446
447    return t;
448}
449
450
451static int bandwidthPulse( void * vmgr );
452
453
454tr_peerMgr*
455tr_peerMgrNew( tr_session * session )
456{
457    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
458
459    m->session = session;
460    m->torrents = TR_PTR_ARRAY_INIT;
461    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
462    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
463    return m;
464}
465
466void
467tr_peerMgrFree( tr_peerMgr * manager )
468{
469    managerLock( manager );
470
471    tr_timerFree( &manager->bandwidthTimer );
472
473    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
474     * the item from manager->handshakes, so this is a little roundabout... */
475    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
476        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
477
478    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
479
480    /* free the torrents. */
481    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
482
483    managerUnlock( manager );
484    tr_free( manager );
485}
486
487static int
488clientIsDownloadingFrom( const tr_peer * peer )
489{
490    return peer->clientIsInterested && !peer->clientIsChoked;
491}
492
493static int
494clientIsUploadingTo( const tr_peer * peer )
495{
496    return peer->peerIsInterested && !peer->peerIsChoked;
497}
498
499/***
500****
501***/
502
503tr_bool
504tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
505                      const uint8_t     * torrentHash,
506                      const tr_address  * addr )
507{
508    tr_bool isSeed = FALSE;
509    const Torrent * t = NULL;
510    const struct peer_atom * atom = NULL;
511
512    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
513    if( t )
514        atom = getExistingAtom( t, addr );
515    if( atom )
516        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
517
518    return isSeed;
519}
520
521/****
522*****
523*****  REFILL
524*****
525****/
526
527static void
528assertValidPiece( Torrent * t, tr_piece_index_t piece )
529{
530    assert( t );
531    assert( t->tor );
532    assert( piece < t->tor->info.pieceCount );
533}
534
535static int
536getPieceRequests( Torrent * t, tr_piece_index_t piece )
537{
538    assertValidPiece( t, piece );
539
540    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
541}
542
543static void
544incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
545{
546    assertValidPiece( t, piece );
547
548    if( t->pendingRequestCount == NULL )
549        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
550    t->pendingRequestCount[piece]++;
551}
552
553static void
554decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
555{
556    assertValidPiece( t, piece );
557
558    if( t->pendingRequestCount )
559        t->pendingRequestCount[piece]--;
560}
561
562struct tr_refill_piece
563{
564    tr_priority_t    priority;
565    uint32_t         piece;
566    uint32_t         peerCount;
567    int              random;
568    int              pendingRequestCount;
569    int              missingBlockCount;
570};
571
572static int
573compareRefillPiece( const void * aIn, const void * bIn )
574{
575    const struct tr_refill_piece * a = aIn;
576    const struct tr_refill_piece * b = bIn;
577
578    /* if one piece has a higher priority, it goes first */
579    if( a->priority != b->priority )
580        return a->priority > b->priority ? -1 : 1;
581
582    /* have a per-priority endgame */
583    if( a->pendingRequestCount != b->pendingRequestCount )
584        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
585
586    /* fewer missing pieces goes first */
587    if( a->missingBlockCount != b->missingBlockCount )
588        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
589
590    /* otherwise if one has fewer peers, it goes first */
591    if( a->peerCount != b->peerCount )
592        return a->peerCount < b->peerCount ? -1 : 1;
593
594    /* otherwise go with our random seed */
595    if( a->random != b->random )
596        return a->random < b->random ? -1 : 1;
597
598    return 0;
599}
600
601static tr_piece_index_t *
602getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
603{
604    const tr_torrent  * tor = t->tor;
605    const tr_info     * inf = &tor->info;
606    tr_piece_index_t    i;
607    tr_piece_index_t    poolSize = 0;
608    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
609    int                 peerCount;
610    const tr_peer    ** peers;
611
612    assert( torrentIsLocked( t ) );
613
614    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
615    peerCount = tr_ptrArraySize( &t->peers );
616
617    /* make a list of the pieces that we want but don't have */
618    for( i = 0; i < inf->pieceCount; ++i )
619        if( !tor->info.pieces[i].dnd
620                && !tr_cpPieceIsComplete( &tor->completion, i ) )
621            pool[poolSize++] = i;
622
623    /* sort the pool by which to request next */
624    if( poolSize > 1 )
625    {
626        tr_piece_index_t j;
627        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
628
629        for( j = 0; j < poolSize; ++j )
630        {
631            int k;
632            const tr_piece_index_t piece = pool[j];
633            struct tr_refill_piece * setme = p + j;
634
635            setme->piece = piece;
636            setme->priority = inf->pieces[piece].priority;
637            setme->peerCount = 0;
638            setme->random = tr_cryptoWeakRandInt( INT_MAX );
639            setme->pendingRequestCount = getPieceRequests( t, piece );
640            setme->missingBlockCount
641                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
642
643            for( k = 0; k < peerCount; ++k )
644            {
645                const tr_peer * peer = peers[k];
646                if( peer->peerIsInterested
647                        && !peer->clientIsChoked
648                        && tr_bitfieldHas( peer->have, piece ) )
649                    ++setme->peerCount;
650            }
651        }
652
653        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
654               compareRefillPiece );
655
656        for( j = 0; j < poolSize; ++j )
657            pool[j] = p[j].piece;
658
659        tr_free( p );
660    }
661
662    *pieceCount = poolSize;
663    return pool;
664}
665
666struct tr_blockIterator
667{
668    Torrent * t;
669    tr_block_index_t blockIndex, blockCount, *blocks;
670    tr_piece_index_t pieceIndex, pieceCount, *pieces;
671};
672
673static struct tr_blockIterator*
674blockIteratorNew( Torrent * t )
675{
676    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
677    i->t = t;
678    i->pieces = getPreferredPieces( t, &i->pieceCount );
679    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
680    return i;
681}
682
683static int
684blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
685{
686    int found;
687    Torrent * t = i->t;
688    tr_torrent * tor = t->tor;
689
690    while( ( i->blockIndex == i->blockCount )
691        && ( i->pieceIndex < i->pieceCount ) )
692    {
693        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
694        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
695        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
696        tr_block_index_t block;
697
698        assert( index < tor->info.pieceCount );
699
700        i->blockCount = 0;
701        i->blockIndex = 0;
702        for( block=b; block!=e; ++block )
703            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
704                i->blocks[i->blockCount++] = block;
705    }
706
707    assert( i->blockCount <= tor->blockCountInPiece );
708
709    if(( found = ( i->blockIndex < i->blockCount )))
710        *setme = i->blocks[i->blockIndex++];
711
712    return found;
713}
714
715static void
716blockIteratorFree( struct tr_blockIterator * i )
717{
718    tr_free( i->blocks );
719    tr_free( i->pieces );
720    tr_free( i );
721}
722
723static tr_peer**
724getPeersUploadingToClient( Torrent * t,
725                           int *     setmeCount )
726{
727    int j;
728    int peerCount = 0;
729    int retCount = 0;
730    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
731    tr_peer ** ret = tr_new( tr_peer *, peerCount );
732
733    j = 0; /* this is a temporary test to make sure we walk through all the peers */
734    if( peerCount )
735    {
736        /* Get a list of peers we're downloading from.
737           Pick a different starting point each time so all peers
738           get a chance at being the first in line */
739        const int fencepost = tr_cryptoWeakRandInt( peerCount );
740        int i = fencepost;
741        do {
742            if( clientIsDownloadingFrom( peers[i] ) )
743                ret[retCount++] = peers[i];
744            i = ( i + 1 ) % peerCount;
745            ++j;
746        } while( i != fencepost );
747    }
748    assert( j == peerCount );
749    *setmeCount = retCount;
750    return ret;
751}
752
753static uint32_t
754getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
755{
756    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
757    const uint64_t blockPos = tor->blockSize * b;
758    assert( blockPos >= piecePos );
759    return (uint32_t)( blockPos - piecePos );
760}
761
762static int
763refillPulse( void * vtorrent )
764{
765    tr_block_index_t block;
766    int peerCount;
767    int webseedCount;
768    tr_peer ** peers;
769    tr_webseed ** webseeds;
770    struct tr_blockIterator * blockIterator;
771    Torrent * t = vtorrent;
772    tr_torrent * tor = t->tor;
773
774    if( !t->isRunning )
775        return TRUE;
776    if( tr_torrentIsSeed( t->tor ) )
777        return TRUE;
778
779    torrentLock( t );
780    tordbg( t, "Refilling Request Buffers..." );
781
782    blockIterator = blockIteratorNew( t );
783    peers = getPeersUploadingToClient( t, &peerCount );
784    webseedCount = tr_ptrArraySize( &t->webseeds );
785    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
786                          webseedCount * sizeof( tr_webseed* ) );
787
788    while( ( webseedCount || peerCount )
789        && blockIteratorNext( blockIterator, &block ) )
790    {
791        int j;
792        int handled = FALSE;
793
794        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
795        const uint32_t offset = getBlockOffsetInPiece( tor, block );
796        const uint32_t length = tr_torBlockCountBytes( tor, block );
797
798        /* find a peer who can ask for this block */
799        for( j=0; !handled && j<peerCount; )
800        {
801            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
802            switch( val )
803            {
804                case TR_ADDREQ_FULL:
805                case TR_ADDREQ_CLIENT_CHOKED:
806                    peers[j] = peers[--peerCount];
807                    break;
808
809                case TR_ADDREQ_MISSING:
810                case TR_ADDREQ_DUPLICATE:
811                    ++j;
812                    break;
813
814                case TR_ADDREQ_OK:
815                    incrementPieceRequests( t, index );
816                    handled = TRUE;
817                    break;
818
819                default:
820                    assert( 0 && "unhandled value" );
821                    break;
822            }
823        }
824
825        /* maybe one of the webseeds can do it */
826        for( j=0; !handled && j<webseedCount; )
827        {
828            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
829            switch( val )
830            {
831                case TR_ADDREQ_FULL:
832                    webseeds[j] = webseeds[--webseedCount];
833                    break;
834
835                case TR_ADDREQ_OK:
836                    incrementPieceRequests( t, index );
837                    handled = TRUE;
838                    break;
839
840                default:
841                    assert( 0 && "unhandled value" );
842                    break;
843            }
844        }
845    }
846
847    /* cleanup */
848    blockIteratorFree( blockIterator );
849    tr_free( webseeds );
850    tr_free( peers );
851
852    t->refillTimer = NULL;
853    torrentUnlock( t );
854    return FALSE;
855}
856
857static void
858broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
859{
860    size_t i;
861    size_t peerCount;
862    tr_peer ** peers;
863
864    assert( torrentIsLocked( t ) );
865
866    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
867
868    peerCount = tr_ptrArraySize( &t->peers );
869    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
870    for( i=0; i<peerCount; ++i )
871        if( peers[i]->msgs )
872            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
873}
874
875static void
876addStrike( Torrent * t,
877           tr_peer * peer )
878{
879    tordbg( t, "increasing peer %s strike count to %d",
880            tr_peerIoAddrStr( &peer->addr,
881                              peer->port ), peer->strikes + 1 );
882
883    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
884    {
885        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
886        atom->myflags |= MYFLAG_BANNED;
887        peer->doPurge = 1;
888        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
889    }
890}
891
892static void
893gotBadPiece( Torrent *        t,
894             tr_piece_index_t pieceIndex )
895{
896    tr_torrent *   tor = t->tor;
897    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
898
899    tor->corruptCur += byteCount;
900    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
901}
902
903static void
904refillSoon( Torrent * t )
905{
906    if( t->refillTimer == NULL )
907        t->refillTimer = tr_timerNew( t->manager->session,
908                                      refillPulse, t,
909                                      REFILL_PERIOD_MSEC );
910}
911
912static void
913peerSuggestedPiece( Torrent            * t UNUSED,
914                    tr_peer            * peer UNUSED,
915                    tr_piece_index_t     pieceIndex UNUSED,
916                    int                  isFastAllowed UNUSED )
917{
918#if 0
919    assert( t );
920    assert( peer );
921    assert( peer->msgs );
922
923    /* is this a valid piece? */
924    if(  pieceIndex >= t->tor->info.pieceCount )
925        return;
926
927    /* don't ask for it if we've already got it */
928    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
929        return;
930
931    /* don't ask for it if they don't have it */
932    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
933        return;
934
935    /* don't ask for it if we're choked and it's not fast */
936    if( !isFastAllowed && peer->clientIsChoked )
937        return;
938
939    /* request the blocks that we don't have in this piece */
940    {
941        tr_block_index_t block;
942        const tr_torrent * tor = t->tor;
943        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
944        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
945
946        for( block=start; block<end; ++block )
947        {
948            if( !tr_cpBlockIsComplete( tor->completion, block ) )
949            {
950                const uint32_t offset = getBlockOffsetInPiece( tor, block );
951                const uint32_t length = tr_torBlockCountBytes( tor, block );
952                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
953                incrementPieceRequests( t, pieceIndex );
954            }
955        }
956    }
957#endif
958}
959
960static void
961peerCallbackFunc( void * vpeer, void * vevent, void * vt )
962{
963    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
964    Torrent * t = vt;
965    const tr_peer_event * e = vevent;
966
967    torrentLock( t );
968
969    switch( e->eventType )
970    {
971        case TR_PEER_UPLOAD_ONLY:
972            /* update our atom */
973            if( peer ) {
974                struct peer_atom * a = getExistingAtom( t, &peer->addr );
975                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
976            }
977            break;
978
979        case TR_PEER_NEED_REQ:
980            refillSoon( t );
981            break;
982
983        case TR_PEER_CANCEL:
984            decrementPieceRequests( t, e->pieceIndex );
985            break;
986
987        case TR_PEER_PEER_GOT_DATA:
988        {
989            const time_t now = time( NULL );
990            tr_torrent * tor = t->tor;
991
992            tor->activityDate = now;
993
994            if( e->wasPieceData )
995                tor->uploadedCur += e->length;
996
997            /* update the stats */
998            if( e->wasPieceData )
999                tr_statsAddUploaded( tor->session, e->length );
1000
1001            /* update our atom */
1002            if( peer ) {
1003                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1004                if( e->wasPieceData )
1005                    a->piece_data_time = now;
1006            }
1007
1008            break;
1009        }
1010
1011        case TR_PEER_CLIENT_GOT_SUGGEST:
1012            if( peer )
1013                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1014            break;
1015
1016        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1017            if( peer )
1018                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1019            break;
1020
1021        case TR_PEER_CLIENT_GOT_DATA:
1022        {
1023            const time_t now = time( NULL );
1024            tr_torrent * tor = t->tor;
1025            tor->activityDate = now;
1026
1027            /* only add this to downloadedCur if we got it from a peer --
1028             * webseeds shouldn't count against our ratio.  As one tracker
1029             * admin put it, "Those pieces are downloaded directly from the
1030             * content distributor, not the peers, it is the tracker's job
1031             * to manage the swarms, not the web server and does not fit
1032             * into the jurisdiction of the tracker." */
1033            if( peer && e->wasPieceData )
1034                tor->downloadedCur += e->length;
1035
1036            /* update the stats */ 
1037            if( e->wasPieceData )
1038                tr_statsAddDownloaded( tor->session, e->length );
1039
1040            /* update our atom */
1041            if( peer ) {
1042                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1043                if( e->wasPieceData )
1044                    a->piece_data_time = now;
1045            }
1046
1047            break;
1048        }
1049
1050        case TR_PEER_PEER_PROGRESS:
1051        {
1052            if( peer )
1053            {
1054                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1055                const int peerIsSeed = e->progress >= 1.0;
1056                if( peerIsSeed ) {
1057                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1058                    atom->flags |= ADDED_F_SEED_FLAG;
1059                } else {
1060                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1061                    atom->flags &= ~ADDED_F_SEED_FLAG;
1062                }
1063            }
1064            break;
1065        }
1066
1067        case TR_PEER_CLIENT_GOT_BLOCK:
1068        {
1069            tr_torrent *     tor = t->tor;
1070
1071            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1072
1073            tr_cpBlockAdd( &tor->completion, block );
1074            decrementPieceRequests( t, e->pieceIndex );
1075
1076            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1077
1078            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1079            {
1080                const tr_piece_index_t p = e->pieceIndex;
1081                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1082
1083                if( !ok )
1084                {
1085                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1086                               (unsigned long)p );
1087                }
1088
1089                tr_torrentSetHasPiece( tor, p, ok );
1090                tr_torrentSetPieceChecked( tor, p, TRUE );
1091                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1092
1093                if( !ok )
1094                    gotBadPiece( t, p );
1095                else
1096                {
1097                    int i;
1098                    int peerCount = tr_ptrArraySize( &t->peers );
1099                    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1100                    for( i=0; i<peerCount; ++i )
1101                        tr_peerMsgsHave( peers[i]->msgs, p );
1102                }
1103
1104                tr_torrentRecheckCompleteness( tor );
1105            }
1106            break;
1107        }
1108
1109        case TR_PEER_ERROR:
1110            if( e->err == EINVAL )
1111            {
1112                addStrike( t, peer );
1113                peer->doPurge = 1;
1114                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1115            }
1116            else if( ( e->err == ERANGE )
1117                  || ( e->err == EMSGSIZE )
1118                  || ( e->err == ENOTCONN ) )
1119            {
1120                /* some protocol error from the peer */
1121                peer->doPurge = 1;
1122                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1123            }
1124            else /* a local error, such as an IO error */
1125            {
1126                t->tor->error = e->err;
1127                tr_strlcpy( t->tor->errorString,
1128                            tr_strerror( t->tor->error ),
1129                            sizeof( t->tor->errorString ) );
1130                tr_torrentStop( t->tor );
1131            }
1132            break;
1133
1134        default:
1135            assert( 0 );
1136    }
1137
1138    torrentUnlock( t );
1139}
1140
1141static void
1142ensureAtomExists( Torrent          * t,
1143                  const tr_address * addr,
1144                  tr_port            port,
1145                  uint8_t            flags,
1146                  uint8_t            from )
1147{
1148    if( getExistingAtom( t, addr ) == NULL )
1149    {
1150        struct peer_atom * a;
1151        a = tr_new0( struct peer_atom, 1 );
1152        a->addr = *addr;
1153        a->port = port;
1154        a->flags = flags;
1155        a->from = from;
1156        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1157        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1158    }
1159}
1160
1161static int
1162getMaxPeerCount( const tr_torrent * tor )
1163{
1164    return tor->maxConnectedPeers;
1165}
1166
1167static int
1168getPeerCount( const Torrent * t )
1169{
1170    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1171}
1172
1173/* FIXME: this is kind of a mess. */
1174static tr_bool
1175myHandshakeDoneCB( tr_handshake  * handshake,
1176                   tr_peerIo     * io,
1177                   int             isConnected,
1178                   const uint8_t * peer_id,
1179                   void          * vmanager )
1180{
1181    tr_bool            ok = isConnected;
1182    tr_bool            success = FALSE;
1183    tr_port            port;
1184    const tr_address * addr;
1185    tr_peerMgr       * manager = vmanager;
1186    Torrent          * t;
1187    tr_handshake     * ours;
1188
1189    assert( io );
1190    assert( isConnected == 0 || isConnected == 1 );
1191
1192    t = tr_peerIoHasTorrentHash( io )
1193        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1194        : NULL;
1195
1196    if( tr_peerIoIsIncoming ( io ) )
1197        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1198                                        handshake, handshakeCompare );
1199    else if( t )
1200        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1201                                        handshake, handshakeCompare );
1202    else
1203        ours = handshake;
1204
1205    assert( ours );
1206    assert( ours == handshake );
1207
1208    if( t )
1209        torrentLock( t );
1210
1211    addr = tr_peerIoGetAddress( io, &port );
1212
1213    if( !ok || !t || !t->isRunning )
1214    {
1215        if( t )
1216        {
1217            struct peer_atom * atom = getExistingAtom( t, addr );
1218            if( atom )
1219                ++atom->numFails;
1220        }
1221    }
1222    else /* looking good */
1223    {
1224        struct peer_atom * atom;
1225        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1226        atom = getExistingAtom( t, addr );
1227        atom->time = time( NULL );
1228        atom->piece_data_time = 0;
1229
1230        if( atom->myflags & MYFLAG_BANNED )
1231        {
1232            tordbg( t, "banned peer %s tried to reconnect",
1233                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1234        }
1235        else if( tr_peerIoIsIncoming( io )
1236               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1237
1238        {
1239        }
1240        else
1241        {
1242            tr_peer * peer = getExistingPeer( t, addr );
1243
1244            if( peer ) /* we already have this peer */
1245            {
1246            }
1247            else
1248            {
1249                peer = getPeer( t, addr );
1250                tr_free( peer->client );
1251
1252                if( !peer_id )
1253                    peer->client = NULL;
1254                else {
1255                    char client[128];
1256                    tr_clientForId( client, sizeof( client ), peer_id );
1257                    peer->client = tr_strdup( client );
1258                }
1259
1260                peer->port = port;
1261                peer->io = tr_handshakeStealIO( handshake );
1262                tr_peerIoRef( peer->io ); /* balanced by the unref in peerDestructor() */
1263                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1264                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1265
1266                success = TRUE;
1267            }
1268        }
1269    }
1270
1271    if( t )
1272        torrentUnlock( t );
1273
1274    return success;
1275}
1276
1277void
1278tr_peerMgrAddIncoming( tr_peerMgr * manager,
1279                       tr_address * addr,
1280                       tr_port      port,
1281                       int          socket )
1282{
1283    managerLock( manager );
1284
1285    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1286    {
1287        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1288        tr_netClose( socket );
1289    }
1290    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1291    {
1292        tr_netClose( socket );
1293    }
1294    else /* we don't have a connetion to them yet... */
1295    {
1296        tr_peerIo *    io;
1297        tr_handshake * handshake;
1298
1299        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1300
1301        handshake = tr_handshakeNew( io,
1302                                     manager->session->encryptionMode,
1303                                     myHandshakeDoneCB,
1304                                     manager );
1305
1306        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1307
1308        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1309                                 handshakeCompare );
1310    }
1311
1312    managerUnlock( manager );
1313}
1314
1315static tr_bool
1316tr_isPex( const tr_pex * pex )
1317{
1318    return pex && tr_isAddress( &pex->addr );
1319}
1320
1321void
1322tr_peerMgrAddPex( tr_peerMgr *    manager,
1323                  const uint8_t * torrentHash,
1324                  uint8_t         from,
1325                  const tr_pex *  pex )
1326{
1327    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1328    {
1329        Torrent * t;
1330        managerLock( manager );
1331
1332        t = getExistingTorrent( manager, torrentHash );
1333        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1334            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1335
1336        managerUnlock( manager );
1337    }
1338}
1339
1340tr_pex *
1341tr_peerMgrCompactToPex( const void *    compact,
1342                        size_t          compactLen,
1343                        const uint8_t * added_f,
1344                        size_t          added_f_len,
1345                        size_t *        pexCount )
1346{
1347    size_t          i;
1348    size_t          n = compactLen / 6;
1349    const uint8_t * walk = compact;
1350    tr_pex *        pex = tr_new0( tr_pex, n );
1351
1352    for( i = 0; i < n; ++i )
1353    {
1354        pex[i].addr.type = TR_AF_INET;
1355        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1356        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1357        if( added_f && ( n == added_f_len ) )
1358            pex[i].flags = added_f[i];
1359    }
1360
1361    *pexCount = n;
1362    return pex;
1363}
1364
1365tr_pex *
1366tr_peerMgrCompact6ToPex( const void    * compact,
1367                         size_t          compactLen,
1368                         const uint8_t * added_f,
1369                         size_t          added_f_len,
1370                         size_t        * pexCount )
1371{
1372    size_t          i;
1373    size_t          n = compactLen / 18;
1374    const uint8_t * walk = compact;
1375    tr_pex *        pex = tr_new0( tr_pex, n );
1376   
1377    for( i = 0; i < n; ++i )
1378    {
1379        pex[i].addr.type = TR_AF_INET6;
1380        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1381        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1382        if( added_f && ( n == added_f_len ) )
1383            pex[i].flags = added_f[i];
1384    }
1385   
1386    *pexCount = n;
1387    return pex;
1388}
1389
1390tr_pex *
1391tr_peerMgrArrayToPex( const void * array,
1392                      size_t       arrayLen,
1393                      size_t      * pexCount )
1394{
1395    size_t          i;
1396    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1397    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1398    const uint8_t * walk = array;
1399    tr_pex        * pex = tr_new0( tr_pex, n );
1400   
1401    for( i = 0 ; i < n ; i++ ) {
1402        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1403        tr_suspectAddress( &pex[i].addr, "tracker"  );
1404        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1405        pex[i].flags = 0x00;
1406        walk += sizeof( tr_address ) + 2;
1407    }
1408   
1409    *pexCount = n;
1410    return pex;
1411}
1412
1413/**
1414***
1415**/
1416
1417void
1418tr_peerMgrSetBlame( tr_peerMgr *     manager,
1419                    const uint8_t *  torrentHash,
1420                    tr_piece_index_t pieceIndex,
1421                    int              success )
1422{
1423    if( !success )
1424    {
1425        int        peerCount, i;
1426        Torrent *  t = getExistingTorrent( manager, torrentHash );
1427        tr_peer ** peers;
1428
1429        assert( torrentIsLocked( t ) );
1430
1431        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1432        for( i = 0; i < peerCount; ++i )
1433        {
1434            tr_peer * peer = peers[i];
1435            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1436            {
1437                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1438                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1439                        pieceIndex, (int)peer->strikes + 1 );
1440                addStrike( t, peer );
1441            }
1442        }
1443    }
1444}
1445
1446int
1447tr_pexCompare( const void * va, const void * vb )
1448{
1449    const tr_pex * a = va;
1450    const tr_pex * b = vb;
1451    int i;
1452
1453    assert( tr_isPex( a ) );
1454    assert( tr_isPex( b ) );
1455
1456    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1457        return i;
1458
1459    if( a->port != b->port )
1460        return a->port < b->port ? -1 : 1;
1461
1462    return 0;
1463}
1464
1465static int
1466peerPrefersCrypto( const tr_peer * peer )
1467{
1468    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1469        return TRUE;
1470
1471    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1472        return FALSE;
1473
1474    return tr_peerIoIsEncrypted( peer->io );
1475}
1476
1477int
1478tr_peerMgrGetPeers( tr_peerMgr      * manager,
1479                    const uint8_t   * torrentHash,
1480                    tr_pex         ** setme_pex,
1481                    uint8_t           af)
1482{
1483    int peersReturning = 0;
1484    const Torrent *  t;
1485
1486    managerLock( manager );
1487
1488    t = getExistingTorrent( manager, torrentHash );
1489    if( t == NULL )
1490    {
1491        *setme_pex = NULL;
1492    }
1493    else
1494    {
1495        int i;
1496        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1497        const int peerCount = tr_ptrArraySize( &t->peers );
1498        /* for now, this will waste memory on torrents that have both
1499         * ipv6 and ipv4 peers */
1500        tr_pex * pex = tr_new( tr_pex, peerCount );
1501        tr_pex * walk = pex;
1502
1503        for( i=0; i<peerCount; ++i )
1504        {
1505            const tr_peer * peer = peers[i];
1506            if( peer->addr.type == af )
1507            {
1508                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1509
1510                assert( tr_isAddress( &peer->addr ) );
1511                walk->addr = peer->addr;
1512                walk->port = peer->port;
1513                walk->flags = 0;
1514                if( peerPrefersCrypto( peer ) )
1515                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1516                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1517                    walk->flags |= ADDED_F_SEED_FLAG;
1518                ++peersReturning;
1519                ++walk;
1520            }
1521        }
1522
1523        assert( ( walk - pex ) == peersReturning );
1524        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1525        *setme_pex = pex;
1526    }
1527
1528    managerUnlock( manager );
1529    return peersReturning;
1530}
1531
1532static int reconnectPulse( void * vtorrent );
1533
1534static int rechokePulse( void * vtorrent );
1535
1536void
1537tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1538                        const uint8_t * torrentHash )
1539{
1540    Torrent * t;
1541
1542    managerLock( manager );
1543
1544    t = getExistingTorrent( manager, torrentHash );
1545
1546    assert( t );
1547    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1548    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1549
1550    if( !t->isRunning )
1551    {
1552        t->isRunning = 1;
1553
1554        t->reconnectTimer = tr_timerNew( t->manager->session,
1555                                         reconnectPulse, t,
1556                                         RECONNECT_PERIOD_MSEC );
1557
1558        t->rechokeTimer = tr_timerNew( t->manager->session,
1559                                       rechokePulse, t,
1560                                       RECHOKE_PERIOD_MSEC );
1561
1562        reconnectPulse( t );
1563
1564        rechokePulse( t );
1565
1566        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1567            refillSoon( t );
1568    }
1569
1570    managerUnlock( manager );
1571}
1572
1573static void
1574stopTorrent( Torrent * t )
1575{
1576    assert( torrentIsLocked( t ) );
1577
1578    t->isRunning = 0;
1579    tr_timerFree( &t->rechokeTimer );
1580    tr_timerFree( &t->reconnectTimer );
1581
1582    /* disconnect the peers. */
1583    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1584    tr_ptrArrayClear( &t->peers );
1585
1586    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1587     * which removes the handshake from t->outgoingHandshakes... */
1588    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1589        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1590}
1591
1592void
1593tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1594                       const uint8_t * torrentHash )
1595{
1596    managerLock( manager );
1597
1598    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1599
1600    managerUnlock( manager );
1601}
1602
1603void
1604tr_peerMgrAddTorrent( tr_peerMgr * manager,
1605                      tr_torrent * tor )
1606{
1607    Torrent * t;
1608
1609    managerLock( manager );
1610
1611    assert( tor );
1612    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1613
1614    t = torrentConstructor( manager, tor );
1615    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1616
1617    managerUnlock( manager );
1618}
1619
1620void
1621tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1622                         const uint8_t * torrentHash )
1623{
1624    Torrent * t;
1625
1626    managerLock( manager );
1627
1628    t = getExistingTorrent( manager, torrentHash );
1629    assert( t );
1630    stopTorrent( t );
1631    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1632    torrentDestructor( t );
1633
1634    managerUnlock( manager );
1635}
1636
1637void
1638tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1639                               const uint8_t *    torrentHash,
1640                               int8_t *           tab,
1641                               unsigned int       tabCount )
1642{
1643    tr_piece_index_t   i;
1644    const Torrent *    t;
1645    const tr_torrent * tor;
1646    float              interval;
1647    tr_bool            isSeed;
1648    int                peerCount;
1649    const tr_peer **   peers;
1650    managerLock( manager );
1651
1652    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1653    tor = t->tor;
1654    interval = tor->info.pieceCount / (float)tabCount;
1655    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1656    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1657    peerCount = tr_ptrArraySize( &t->peers );
1658
1659    memset( tab, 0, tabCount );
1660
1661    for( i = 0; tor && i < tabCount; ++i )
1662    {
1663        const int piece = i * interval;
1664
1665        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1666            tab[i] = -1;
1667        else if( peerCount ) {
1668            int j;
1669            for( j = 0; j < peerCount; ++j )
1670                if( tr_bitfieldHas( peers[j]->have, i ) )
1671                    ++tab[i];
1672        }
1673    }
1674
1675    managerUnlock( manager );
1676}
1677
1678/* Returns the pieces that are available from peers */
1679tr_bitfield*
1680tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1681                        const uint8_t *    torrentHash )
1682{
1683    int i;
1684    int peerCount;
1685    Torrent * t;
1686    const tr_peer ** peers;
1687    tr_bitfield * pieces;
1688    managerLock( manager );
1689
1690    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1691    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1692    peerCount = tr_ptrArraySize( &t->peers );
1693    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1694    for( i=0; i<peerCount; ++i )
1695        tr_bitfieldOr( pieces, peers[i]->have );
1696
1697    managerUnlock( manager );
1698    return pieces;
1699}
1700
1701int
1702tr_peerMgrHasConnections( const tr_peerMgr * manager,
1703                          const uint8_t *    torrentHash )
1704{
1705    int ret;
1706    const Torrent * t;
1707    managerLock( manager );
1708
1709    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1710    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1711
1712    managerUnlock( manager );
1713    return ret;
1714}
1715
1716void
1717tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1718                        const uint8_t    * torrentHash,
1719                        int              * setmePeersKnown,
1720                        int              * setmePeersConnected,
1721                        int              * setmeSeedsConnected,
1722                        int              * setmeWebseedsSendingToUs,
1723                        int              * setmePeersSendingToUs,
1724                        int              * setmePeersGettingFromUs,
1725                        int              * setmePeersFrom )
1726{
1727    int i, size;
1728    const Torrent * t;
1729    const tr_peer ** peers;
1730    const tr_webseed ** webseeds;
1731
1732    managerLock( manager );
1733
1734    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1735    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1736    size = tr_ptrArraySize( &t->peers );
1737
1738    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1739    *setmePeersConnected       = 0;
1740    *setmeSeedsConnected       = 0;
1741    *setmePeersGettingFromUs   = 0;
1742    *setmePeersSendingToUs     = 0;
1743    *setmeWebseedsSendingToUs  = 0;
1744
1745    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1746        setmePeersFrom[i] = 0;
1747
1748    for( i=0; i<size; ++i )
1749    {
1750        const tr_peer * peer = peers[i];
1751        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1752
1753        if( peer->io == NULL ) /* not connected */
1754            continue;
1755
1756        ++*setmePeersConnected;
1757
1758        ++setmePeersFrom[atom->from];
1759
1760        if( clientIsDownloadingFrom( peer ) )
1761            ++*setmePeersSendingToUs;
1762
1763        if( clientIsUploadingTo( peer ) )
1764            ++*setmePeersGettingFromUs;
1765
1766        if( atom->flags & ADDED_F_SEED_FLAG )
1767            ++*setmeSeedsConnected;
1768    }
1769
1770    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1771    size = tr_ptrArraySize( &t->webseeds );
1772    for( i=0; i<size; ++i )
1773        if( tr_webseedIsActive( webseeds[i] ) )
1774            ++*setmeWebseedsSendingToUs;
1775
1776    managerUnlock( manager );
1777}
1778
1779float*
1780tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1781                     const uint8_t *    torrentHash )
1782{
1783    const Torrent * t;
1784    const tr_webseed ** webseeds;
1785    int i;
1786    int webseedCount;
1787    float * ret;
1788    uint64_t now;
1789
1790    assert( manager );
1791    managerLock( manager );
1792
1793    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1794    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1795    webseedCount = tr_ptrArraySize( &t->webseeds );
1796    assert( webseedCount == t->tor->info.webseedCount );
1797    ret = tr_new0( float, webseedCount );
1798    now = tr_date( );
1799
1800    for( i=0; i<webseedCount; ++i )
1801        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1802            ret[i] = -1.0;
1803
1804    managerUnlock( manager );
1805    return ret;
1806}
1807
1808double
1809tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1810{
1811    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1812}
1813
1814
1815struct tr_peer_stat *
1816tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1817                     const   uint8_t     * torrentHash,
1818                     int                 * setmeCount )
1819{
1820    int i, size;
1821    const Torrent * t;
1822    const tr_peer ** peers;
1823    tr_peer_stat * ret;
1824    uint64_t now;
1825
1826    assert( manager );
1827    managerLock( manager );
1828
1829    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1830    size = tr_ptrArraySize( &t->peers );
1831    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1832    ret = tr_new0( tr_peer_stat, size );
1833    now = tr_date( );
1834
1835    for( i = 0; i < size; ++i )
1836    {
1837        char *                   pch;
1838        const tr_peer *          peer = peers[i];
1839        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1840        tr_peer_stat *           stat = ret + i;
1841        tr_address               norm_addr;
1842
1843        norm_addr = peer->addr;
1844        tr_normalizeV4Mapped( &norm_addr );
1845        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1846        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1847                   sizeof( stat->client ) );
1848        stat->port               = ntohs( peer->port );
1849        stat->from               = atom->from;
1850        stat->progress           = peer->progress;
1851        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1852        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1853        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1854        stat->peerIsChoked       = peer->peerIsChoked;
1855        stat->peerIsInterested   = peer->peerIsInterested;
1856        stat->clientIsChoked     = peer->clientIsChoked;
1857        stat->clientIsInterested = peer->clientIsInterested;
1858        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1859        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1860        stat->isUploadingTo      = clientIsUploadingTo( peer );
1861        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1862
1863        pch = stat->flagStr;
1864        if( t->optimistic == peer ) *pch++ = 'O';
1865        if( stat->isDownloadingFrom ) *pch++ = 'D';
1866        else if( stat->clientIsInterested ) *pch++ = 'd';
1867        if( stat->isUploadingTo ) *pch++ = 'U';
1868        else if( stat->peerIsInterested ) *pch++ = 'u';
1869        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1870        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1871        if( stat->isEncrypted ) *pch++ = 'E';
1872        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1873        if( stat->isIncoming ) *pch++ = 'I';
1874        *pch = '\0';
1875    }
1876
1877    *setmeCount = size;
1878
1879    managerUnlock( manager );
1880    return ret;
1881}
1882
1883/**
1884***
1885**/
1886
1887struct ChokeData
1888{
1889    tr_bool         doUnchoke;
1890    tr_bool         isInterested;
1891    tr_bool         isChoked;
1892    int             rate;
1893    tr_peer *       peer;
1894};
1895
1896static int
1897compareChoke( const void * va,
1898              const void * vb )
1899{
1900    const struct ChokeData * a = va;
1901    const struct ChokeData * b = vb;
1902
1903    if( a->rate != b->rate ) /* prefer higher overall speeds */
1904        return a->rate > b->rate ? -1 : 1;
1905
1906    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1907        return a->isChoked ? 1 : -1;
1908
1909    return 0;
1910}
1911
1912static int
1913isNew( const tr_peer * peer )
1914{
1915    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1916}
1917
1918static int
1919isSame( const tr_peer * peer )
1920{
1921    return peer && peer->client && strstr( peer->client, "Transmission" );
1922}
1923
1924/**
1925***
1926**/
1927
1928static void
1929rechoke( Torrent * t )
1930{
1931    int i, size, unchokedInterested;
1932    const int peerCount = tr_ptrArraySize( &t->peers );
1933    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1934    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1935    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1936    const uint64_t now = tr_date( );
1937
1938    assert( torrentIsLocked( t ) );
1939
1940    /* sort the peers by preference and rate */
1941    for( i = 0, size = 0; i < peerCount; ++i )
1942    {
1943        tr_peer * peer = peers[i];
1944        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1945
1946        if( peer->progress >= 1.0 ) /* choke all seeds */
1947        {
1948            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1949        }
1950        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1951        {
1952            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1953        }
1954        else if( chokeAll ) /* choke everyone if we're not uploading */
1955        {
1956            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1957        }
1958        else
1959        {
1960            struct ChokeData * n = &choke[size++];
1961            n->peer         = peer;
1962            n->isInterested = peer->peerIsInterested;
1963            n->isChoked     = peer->peerIsChoked;
1964            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1965        }
1966    }
1967
1968    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1969
1970    /**
1971     * Reciprocation and number of uploads capping is managed by unchoking
1972     * the N peers which have the best upload rate and are interested.
1973     * This maximizes the client's download rate. These N peers are
1974     * referred to as downloaders, because they are interested in downloading
1975     * from the client.
1976     *
1977     * Peers which have a better upload rate (as compared to the downloaders)
1978     * but aren't interested get unchoked. If they become interested, the
1979     * downloader with the worst upload rate gets choked. If a client has
1980     * a complete file, it uses its upload rate rather than its download
1981     * rate to decide which peers to unchoke.
1982     */
1983    unchokedInterested = 0;
1984    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1985        choke[i].doUnchoke = 1;
1986        if( choke[i].isInterested )
1987            ++unchokedInterested;
1988    }
1989
1990    /* optimistic unchoke */
1991    if( i < size )
1992    {
1993        int n;
1994        struct ChokeData * c;
1995        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1996
1997        for( ; i<size; ++i )
1998        {
1999            if( choke[i].isInterested )
2000            {
2001                const tr_peer * peer = choke[i].peer;
2002                int x = 1, y;
2003                if( isNew( peer ) ) x *= 3;
2004                if( isSame( peer ) ) x *= 3;
2005                for( y=0; y<x; ++y )
2006                    tr_ptrArrayAppend( &randPool, &choke[i] );
2007            }
2008        }
2009
2010        if(( n = tr_ptrArraySize( &randPool )))
2011        {
2012            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2013            c->doUnchoke = 1;
2014            t->optimistic = c->peer;
2015        }
2016
2017        tr_ptrArrayDestruct( &randPool, NULL );
2018    }
2019
2020    for( i=0; i<size; ++i )
2021        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2022
2023    /* cleanup */
2024    tr_free( choke );
2025}
2026
2027static int
2028rechokePulse( void * vtorrent )
2029{
2030    Torrent * t = vtorrent;
2031
2032    torrentLock( t );
2033    rechoke( t );
2034    torrentUnlock( t );
2035    return TRUE;
2036}
2037
2038/***
2039****
2040****  Life and Death
2041****
2042***/
2043
2044static int
2045shouldPeerBeClosed( const Torrent * t,
2046                    const tr_peer * peer,
2047                    int             peerCount )
2048{
2049    const tr_torrent *       tor = t->tor;
2050    const time_t             now = time( NULL );
2051    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2052
2053    /* if it's marked for purging, close it */
2054    if( peer->doPurge )
2055    {
2056        tordbg( t, "purging peer %s because its doPurge flag is set",
2057                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2058        return TRUE;
2059    }
2060
2061    /* if we're seeding and the peer has everything we have,
2062     * and enough time has passed for a pex exchange, then disconnect */
2063    if( tr_torrentIsSeed( tor ) )
2064    {
2065        int peerHasEverything;
2066        if( atom->flags & ADDED_F_SEED_FLAG )
2067            peerHasEverything = TRUE;
2068        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2069            peerHasEverything = FALSE;
2070        else {
2071            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2072            tr_bitfieldDifference( tmp, peer->have );
2073            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2074            tr_bitfieldFree( tmp );
2075        }
2076
2077        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2078        {
2079            tordbg( t, "purging peer %s because we're both seeds",
2080                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2081            return TRUE;
2082        }
2083    }
2084
2085    /* disconnect if it's been too long since piece data has been transferred.
2086     * this is on a sliding scale based on number of available peers... */
2087    {
2088        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2089        /* if we have >= relaxIfFewerThan, strictness is 100%.
2090         * if we have zero connections, strictness is 0% */
2091        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2092            ? 1.0
2093            : peerCount / (float)relaxStrictnessIfFewerThanN;
2094        const int lo = MIN_UPLOAD_IDLE_SECS;
2095        const int hi = MAX_UPLOAD_IDLE_SECS;
2096        const int limit = hi - ( ( hi - lo ) * strictness );
2097        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2098/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2099        if( idleTime > limit ) {
2100            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2101                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2102            return TRUE;
2103        }
2104    }
2105
2106    return FALSE;
2107}
2108
2109static tr_peer **
2110getPeersToClose( Torrent * t, int * setmeSize )
2111{
2112    int i, peerCount, outsize;
2113    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2114    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2115
2116    assert( torrentIsLocked( t ) );
2117
2118    for( i = outsize = 0; i < peerCount; ++i )
2119        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2120            ret[outsize++] = peers[i];
2121
2122    *setmeSize = outsize;
2123    return ret;
2124}
2125
2126static int
2127compareCandidates( const void * va,
2128                   const void * vb )
2129{
2130    const struct peer_atom * a = *(const struct peer_atom**) va;
2131    const struct peer_atom * b = *(const struct peer_atom**) vb;
2132
2133    /* <Charles> Here we would probably want to try reconnecting to
2134     * peers that had most recently given us data. Lots of users have
2135     * trouble with resets due to their routers and/or ISPs. This way we
2136     * can quickly recover from an unwanted reset. So we sort
2137     * piece_data_time in descending order.
2138     */
2139
2140    if( a->piece_data_time != b->piece_data_time )
2141        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2142
2143    if( a->numFails != b->numFails )
2144        return a->numFails < b->numFails ? -1 : 1;
2145
2146    if( a->time != b->time )
2147        return a->time < b->time ? -1 : 1;
2148
2149    /* all other things being equal, prefer peers whose
2150     * information comes from a more reliable source */
2151    if( a->from != b->from )
2152        return a->from < b->from ? -1 : 1;
2153
2154    return 0;
2155}
2156
2157static int
2158getReconnectIntervalSecs( const struct peer_atom * atom )
2159{
2160    int          sec;
2161    const time_t now = time( NULL );
2162
2163    /* if we were recently connected to this peer and transferring piece
2164     * data, try to reconnect to them sooner rather that later -- we don't
2165     * want network troubles to get in the way of a good peer. */
2166    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2167        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2168
2169    /* don't allow reconnects more often than our minimum */
2170    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2171        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2172
2173    /* otherwise, the interval depends on how many times we've tried
2174     * and failed to connect to the peer */
2175    else switch( atom->numFails ) {
2176        case 0: sec = 0; break;
2177        case 1: sec = 5; break;
2178        case 2: sec = 2 * 60; break;
2179        case 3: sec = 15 * 60; break;
2180        case 4: sec = 30 * 60; break;
2181        case 5: sec = 60 * 60; break;
2182        default: sec = 120 * 60; break;
2183    }
2184
2185    return sec;
2186}
2187
2188static struct peer_atom **
2189getPeerCandidates( Torrent * t, int * setmeSize )
2190{
2191    int                 i, atomCount, retCount;
2192    struct peer_atom ** atoms;
2193    struct peer_atom ** ret;
2194    const time_t        now = time( NULL );
2195    const int           seed = tr_torrentIsSeed( t->tor );
2196
2197    assert( torrentIsLocked( t ) );
2198
2199    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2200    ret = tr_new( struct peer_atom*, atomCount );
2201    for( i = retCount = 0; i < atomCount; ++i )
2202    {
2203        int                interval;
2204        struct peer_atom * atom = atoms[i];
2205
2206        /* peer fed us too much bad data ... we only keep it around
2207         * now to weed it out in case someone sends it to us via pex */
2208        if( atom->myflags & MYFLAG_BANNED )
2209            continue;
2210
2211        /* peer was unconnectable before, so we're not going to keep trying.
2212         * this is needs a separate flag from `banned', since if they try
2213         * to connect to us later, we'll let them in */
2214        if( atom->myflags & MYFLAG_UNREACHABLE )
2215            continue;
2216
2217        /* we don't need two connections to the same peer... */
2218        if( peerIsInUse( t, &atom->addr ) )
2219            continue;
2220
2221        /* no need to connect if we're both seeds... */
2222        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2223                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2224            continue;
2225
2226        /* don't reconnect too often */
2227        interval = getReconnectIntervalSecs( atom );
2228        if( ( now - atom->time ) < interval )
2229        {
2230            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2231                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2232            continue;
2233        }
2234
2235        /* Don't connect to peers in our blocklist */
2236        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2237            continue;
2238
2239        ret[retCount++] = atom;
2240    }
2241
2242    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2243    *setmeSize = retCount;
2244    return ret;
2245}
2246
2247static int
2248reconnectPulse( void * vtorrent )
2249{
2250    Torrent *     t = vtorrent;
2251    static time_t prevTime = 0;
2252    static int    newConnectionsThisSecond = 0;
2253    time_t        now;
2254
2255    torrentLock( t );
2256
2257    now = time( NULL );
2258    if( prevTime != now )
2259    {
2260        prevTime = now;
2261        newConnectionsThisSecond = 0;
2262    }
2263
2264    if( !t->isRunning )
2265    {
2266        removeAllPeers( t );
2267    }
2268    else
2269    {
2270        int i, nCandidates, nBad;
2271        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2272        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2273
2274        //if( nBad || nCandidates )
2275            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2276                    "%d connection candidates, %d atoms, max per pulse is %d",
2277                    t->tor->info.name, nBad, nCandidates,
2278                    tr_ptrArraySize( &t->pool ),
2279                    (int)MAX_RECONNECTIONS_PER_PULSE );
2280
2281        /* disconnect some peers.
2282           if we transferred piece data, then they might be good peers,
2283           so reset their `numFails' weight to zero.  otherwise we connected
2284           to them fruitlessly, so mark it as another fail */
2285        for( i = 0; i < nBad; ++i ) {
2286            tr_peer * peer = connections[i];
2287            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2288            if( atom->piece_data_time )
2289                atom->numFails = 0;
2290            else
2291                ++atom->numFails;
2292            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2293            removePeer( t, peer );
2294        }
2295
2296tordbg( t, "nCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, getPeerCount(t) is %d, getMaxPeerCount(t) is %d, newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2297        (int)nCandidates, (int)MAX_RECONNECTIONS_PER_PULSE, (int)getPeerCount( t ), (int)getMaxPeerCount( t->tor ), (int)newConnectionsThisSecond, (int)MAX_CONNECTIONS_PER_SECOND );
2298
2299        /* add some new ones */
2300        for( i = 0;    ( i < nCandidates )
2301           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2302           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2303           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2304        {
2305            tr_peerMgr *       mgr = t->manager;
2306            struct peer_atom * atom = candidates[i];
2307            tr_peerIo *        io;
2308
2309            tordbg( t, "Starting an OUTGOING connection with %s",
2310                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2311
2312            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2313
2314            if( io == NULL )
2315            {
2316                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2317                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2318                atom->myflags |= MYFLAG_UNREACHABLE;
2319            }
2320            else
2321            {
2322                tr_handshake * handshake = tr_handshakeNew( io,
2323                                                            mgr->session->encryptionMode,
2324                                                            myHandshakeDoneCB,
2325                                                            mgr );
2326
2327                assert( tr_peerIoGetTorrentHash( io ) );
2328
2329                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2330
2331                ++newConnectionsThisSecond;
2332
2333                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2334                                         handshakeCompare );
2335            }
2336
2337            atom->time = time( NULL );
2338        }
2339
2340        /* cleanup */
2341        tr_free( connections );
2342        tr_free( candidates );
2343    }
2344
2345    torrentUnlock( t );
2346    return TRUE;
2347}
2348
2349/****
2350*****
2351*****  BANDWIDTH ALLOCATION
2352*****
2353****/
2354
2355static void
2356pumpAllPeers( tr_peerMgr * mgr )
2357{
2358    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2359    int       i, j;
2360
2361    for( i=0; i<torrentCount; ++i )
2362    {
2363        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2364        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2365        {
2366            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2367            tr_peerMsgsPulse( peer->msgs );
2368        }
2369    }
2370}
2371
2372static int
2373bandwidthPulse( void * vmgr )
2374{
2375    tr_peerMgr * mgr = vmgr;
2376    managerLock( mgr );
2377
2378    /* FIXME: this next line probably isn't necessary... */
2379    pumpAllPeers( mgr );
2380
2381    /* allocate bandwidth to the peers */
2382    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2383    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2384
2385    managerUnlock( mgr );
2386    return TRUE;
2387}
Note: See TracBrowser for help on using the repository browser.