source: trunk/libtransmission/peer-mgr.c @ 7109

Last change on this file since 7109 was 7109, checked in by charles, 12 years ago

(libT) some people have complained that Transmission doesn't upload enough until after downloading is complete. This commit ought to fix that.

  • Property svn:keywords set to Date Rev Author Id
File size: 73.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7109 2008-11-15 00:46:51Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57   
58    /* how frequently to reallocate bandwidth */
59    BANDWIDTH_PERIOD_MSEC = 250,
60
61    /* max # of peers to ask fer per torrent per reconnect pulse */
62    MAX_RECONNECTIONS_PER_PULSE = 2,
63
64    /* max number of peers to ask for per second overall.
65    * this throttle is to avoid overloading the router */
66    MAX_CONNECTIONS_PER_SECOND = 4,
67
68    /* number of unchoked peers per torrent.
69     * FIXME: this probably ought to be configurable */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91/* We keep one of these for every peer we know about, whether
92 * it's connected or not, so the struct must be small.
93 * When our current connections underperform, we dip back
94 * into this list for new ones. */
95struct peer_atom
96{
97    uint8_t           from;
98    uint8_t           flags; /* these match the added_f flags */
99    uint8_t           myflags; /* flags that aren't defined in added_f */
100    uint16_t          port;
101    uint16_t          numFails;
102    struct in_addr    addr;
103    time_t            time; /* when the peer's connection status last changed */
104    time_t            piece_data_time;
105};
106
107typedef struct
108{
109    unsigned int    isRunning : 1;
110
111    uint8_t         hash[SHA_DIGEST_LENGTH];
112    int         *   pendingRequestCount;
113    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
114    tr_ptrArray *   pool; /* struct peer_atom */
115    tr_ptrArray *   peers; /* tr_peer */
116    tr_ptrArray *   webseeds; /* tr_webseed */
117    tr_timer *      reconnectTimer;
118    tr_timer *      rechokeTimer;
119    tr_timer *      refillTimer;
120    tr_torrent *    tor;
121    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
122
123    struct tr_peerMgr * manager;
124}
125Torrent;
126
127struct tr_peerMgr
128{
129    tr_session      * session;
130    tr_ptrArray     * torrents; /* Torrent */
131    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
132    tr_timer        * bandwidthTimer;
133    tr_ratecontrol  * globalPoolRawSpeed[2];
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187compareAddresses( const struct in_addr * a,
188                  const struct in_addr * b )
189{
190    if( a->s_addr != b->s_addr )
191        return a->s_addr < b->s_addr ? -1 : 1;
192
193    return 0;
194}
195
196static int
197handshakeCompareToAddr( const void * va,
198                        const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a,
207                  const void * b )
208{
209    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
210}
211
212static tr_handshake*
213getExistingHandshake( tr_ptrArray *          handshakes,
214                      const struct in_addr * in_addr )
215{
216    return tr_ptrArrayFindSorted( handshakes,
217                                  in_addr,
218                                  handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va,
223                          const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va,
232                  const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va,
274             const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return compareAddresses( &a->in_addr, &b->in_addr );
280}
281
282static int
283peerCompareToAddr( const void * va,
284                   const void * vb )
285{
286    const tr_peer * a = va;
287
288    return compareAddresses( &a->in_addr, vb );
289}
290
291static tr_peer*
292getExistingPeer( Torrent *              torrent,
293                 const struct in_addr * in_addr )
294{
295    assert( torrentIsLocked( torrent ) );
296    assert( in_addr );
297
298    return tr_ptrArrayFindSorted( torrent->peers,
299                                  in_addr,
300                                  peerCompareToAddr );
301}
302
303static struct peer_atom*
304getExistingAtom( const                  Torrent * t,
305                 const struct in_addr * addr )
306{
307    assert( torrentIsLocked( t ) );
308    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
309}
310
311static int
312peerIsInUse( const Torrent *        ct,
313             const struct in_addr * addr )
314{
315    Torrent * t = (Torrent*) ct;
316
317    assert( torrentIsLocked ( t ) );
318
319    return getExistingPeer( t, addr )
320           || getExistingHandshake( t->outgoingHandshakes, addr )
321           || getExistingHandshake( t->manager->incomingHandshakes, addr );
322}
323
324static tr_peer*
325peerConstructor( const struct in_addr * in_addr )
326{
327    tr_peer * p;
328
329    p = tr_new0( tr_peer, 1 );
330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
331    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
332    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
333    return p;
334}
335
336static tr_peer*
337getPeer( Torrent *              torrent,
338         const struct in_addr * in_addr )
339{
340    tr_peer * peer;
341
342    assert( torrentIsLocked( torrent ) );
343
344    peer = getExistingPeer( torrent, in_addr );
345
346    if( peer == NULL )
347    {
348        peer = peerConstructor( in_addr );
349        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
350    }
351
352    return peer;
353}
354
355static void
356peerDestructor( tr_peer * peer )
357{
358    assert( peer );
359    assert( peer->msgs );
360
361    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
362    tr_peerMsgsFree( peer->msgs );
363
364    tr_peerIoFree( peer->io );
365
366    tr_bitfieldFree( peer->have );
367    tr_bitfieldFree( peer->blame );
368    tr_free( peer->client );
369
370    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
371    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
372    tr_free( peer );
373}
374
375static void
376removePeer( Torrent * t,
377            tr_peer * peer )
378{
379    tr_peer *          removed;
380    struct peer_atom * atom;
381
382    assert( torrentIsLocked( t ) );
383
384    atom = getExistingAtom( t, &peer->in_addr );
385    assert( atom );
386    atom->time = time( NULL );
387
388    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
389    assert( removed == peer );
390    peerDestructor( removed );
391}
392
393static void
394removeAllPeers( Torrent * t )
395{
396    while( !tr_ptrArrayEmpty( t->peers ) )
397        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
398}
399
400static void
401torrentDestructor( void * vt )
402{
403    Torrent * t = vt;
404    uint8_t   hash[SHA_DIGEST_LENGTH];
405
406    assert( t );
407    assert( !t->isRunning );
408    assert( t->peers );
409    assert( torrentIsLocked( t ) );
410    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
411    assert( tr_ptrArrayEmpty( t->peers ) );
412
413    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
414
415    tr_timerFree( &t->reconnectTimer );
416    tr_timerFree( &t->rechokeTimer );
417    tr_timerFree( &t->refillTimer );
418
419    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
420    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
421    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
422    tr_ptrArrayFree( t->peers, NULL );
423
424    tr_free( t->pendingRequestCount );
425    tr_free( t );
426}
427
428static void peerCallbackFunc( void * vpeer,
429                              void * vevent,
430                              void * vt );
431
432static Torrent*
433torrentConstructor( tr_peerMgr * manager,
434                    tr_torrent * tor )
435{
436    int       i;
437    Torrent * t;
438
439    t = tr_new0( Torrent, 1 );
440    t->manager = manager;
441    t->tor = tor;
442    t->pool = tr_ptrArrayNew( );
443    t->peers = tr_ptrArrayNew( );
444    t->webseeds = tr_ptrArrayNew( );
445    t->outgoingHandshakes = tr_ptrArrayNew( );
446    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
447
448    for( i = 0; i < tor->info.webseedCount; ++i )
449    {
450        tr_webseed * w =
451            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
452                           t );
453        tr_ptrArrayAppend( t->webseeds, w );
454    }
455
456    return t;
457}
458
459/**
460 * For explanation, see http://www.bittorrent.org/fast_extensions.html
461 * Also see the "test-allowed-set" unit test
462 *
463 * @param k number of pieces in set
464 * @param sz number of pieces in the torrent
465 * @param infohash torrent's SHA1 hash
466 * @param ip peer's address
467 */
468struct tr_bitfield *
469tr_peerMgrGenerateAllowedSet(
470    const uint32_t         k,
471    const uint32_t         sz,
472    const                  uint8_t        *
473                           infohash,
474    const struct in_addr * ip )
475{
476    uint8_t       w[SHA_DIGEST_LENGTH + 4];
477    uint8_t       x[SHA_DIGEST_LENGTH];
478    tr_bitfield * a;
479    uint32_t      a_size;
480
481    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
482    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
483    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
484
485    a = tr_bitfieldNew( sz );
486    a_size = 0;
487
488    while( a_size < k )
489    {
490        int i;
491        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
492        {
493            uint32_t j = i * 4;                                /* (5) */
494            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
495            uint32_t index = y % sz;                           /* (7) */
496            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
497            {
498                tr_bitfieldAdd( a, index );                    /* (9) */
499                ++a_size;
500            }
501        }
502        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
503    }
504
505    return a;
506}
507
508static int bandwidthPulse( void * vmgr );
509
510
511tr_peerMgr*
512tr_peerMgrNew( tr_session * session )
513{
514    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
515
516    m->session = session;
517    m->torrents = tr_ptrArrayNew( );
518    m->incomingHandshakes = tr_ptrArrayNew( );
519    m->globalPoolRawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
520    m->globalPoolRawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
521    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
522    return m;
523}
524
525void
526tr_peerMgrFree( tr_peerMgr * manager )
527{
528    managerLock( manager );
529
530    tr_timerFree( &manager->bandwidthTimer );
531    tr_rcClose( manager->globalPoolRawSpeed[TR_CLIENT_TO_PEER] );
532    tr_rcClose( manager->globalPoolRawSpeed[TR_PEER_TO_CLIENT] );
533
534    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
535     * the item from manager->handshakes, so this is a little roundabout... */
536    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
537        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
538
539    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
540
541    /* free the torrents. */
542    tr_ptrArrayFree( manager->torrents, torrentDestructor );
543
544    managerUnlock( manager );
545    tr_free( manager );
546}
547
548static tr_peer**
549getConnectedPeers( Torrent * t,
550                   int *     setmeCount )
551{
552    int       i, peerCount, connectionCount;
553    tr_peer **peers;
554    tr_peer **ret;
555
556    assert( torrentIsLocked( t ) );
557
558    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
559    ret = tr_new( tr_peer *, peerCount );
560
561    for( i = connectionCount = 0; i < peerCount; ++i )
562        if( peers[i]->msgs )
563            ret[connectionCount++] = peers[i];
564
565    *setmeCount = connectionCount;
566    return ret;
567}
568
569static int
570clientIsDownloadingFrom( const tr_peer * peer )
571{
572    return peer->clientIsInterested && !peer->clientIsChoked;
573}
574
575static int
576clientIsUploadingTo( const tr_peer * peer )
577{
578    return peer->peerIsInterested && !peer->peerIsChoked;
579}
580
581/***
582****
583***/
584
585int
586tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
587                      const uint8_t *        torrentHash,
588                      const struct in_addr * addr )
589{
590    int                      isSeed = FALSE;
591    const Torrent *          t = NULL;
592    const struct peer_atom * atom = NULL;
593
594    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
595    if( t )
596        atom = getExistingAtom( t, addr );
597    if( atom )
598        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
599
600    return isSeed;
601}
602
603/****
604*****
605*****  REFILL
606*****
607****/
608
609static void
610assertValidPiece( Torrent * t, tr_piece_index_t piece )
611{
612    assert( t );
613    assert( t->tor );
614    assert( piece < t->tor->info.pieceCount );
615}
616
617static int
618getPieceRequests( Torrent * t, tr_piece_index_t piece )
619{
620    assertValidPiece( t, piece );
621
622    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
623}
624
625static void
626incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
627{
628    assertValidPiece( t, piece );
629
630    if( t->pendingRequestCount == NULL )
631        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
632    t->pendingRequestCount[piece]++;
633}
634
635static void
636decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
637{
638    assertValidPiece( t, piece );
639
640    if( t->pendingRequestCount )
641        t->pendingRequestCount[piece]--;
642}
643
644struct tr_refill_piece
645{
646    tr_priority_t    priority;
647    uint32_t         piece;
648    uint32_t         peerCount;
649    int              random;
650    int              pendingRequestCount;
651    int              missingBlockCount;
652};
653
654static int
655compareRefillPiece( const void * aIn,
656                    const void * bIn )
657{
658    const struct tr_refill_piece * a = aIn;
659    const struct tr_refill_piece * b = bIn;
660
661    /* if one piece has a higher priority, it goes first */
662    if( a->priority != b->priority )
663        return a->priority > b->priority ? -1 : 1;
664
665    /* have a per-priority endgame */
666    if( a->pendingRequestCount != b->pendingRequestCount )
667        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
668
669    /* fewer missing pieces goes first */
670    if( a->missingBlockCount != b->missingBlockCount )
671        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
672
673    /* otherwise if one has fewer peers, it goes first */
674    if( a->peerCount != b->peerCount )
675        return a->peerCount < b->peerCount ? -1 : 1;
676
677    /* otherwise go with our random seed */
678    if( a->random != b->random )
679        return a->random < b->random ? -1 : 1;
680
681    return 0;
682}
683
684static tr_piece_index_t *
685getPreferredPieces( Torrent           * t,
686                    tr_piece_index_t  * pieceCount )
687{
688    const tr_torrent  * tor = t->tor;
689    const tr_info     * inf = &tor->info;
690    tr_piece_index_t    i;
691    tr_piece_index_t    poolSize = 0;
692    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
693    int                 peerCount;
694    tr_peer**           peers;
695
696    assert( torrentIsLocked( t ) );
697
698    peers = getConnectedPeers( t, &peerCount );
699
700    /* make a list of the pieces that we want but don't have */
701    for( i = 0; i < inf->pieceCount; ++i )
702        if( !tor->info.pieces[i].dnd
703                && !tr_cpPieceIsComplete( tor->completion, i ) )
704            pool[poolSize++] = i;
705
706    /* sort the pool by which to request next */
707    if( poolSize > 1 )
708    {
709        tr_piece_index_t j;
710        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
711
712        for( j = 0; j < poolSize; ++j )
713        {
714            int k;
715            const tr_piece_index_t piece = pool[j];
716            struct tr_refill_piece * setme = p + j;
717
718            setme->piece = piece;
719            setme->priority = inf->pieces[piece].priority;
720            setme->peerCount = 0;
721            setme->random = tr_cryptoWeakRandInt( INT_MAX );
722            setme->pendingRequestCount = getPieceRequests( t, piece );
723            setme->missingBlockCount
724                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
725
726            for( k = 0; k < peerCount; ++k )
727            {
728                const tr_peer * peer = peers[k];
729                if( peer->peerIsInterested
730                        && !peer->clientIsChoked
731                        && tr_bitfieldHas( peer->have, piece ) )
732                    ++setme->peerCount;
733            }
734        }
735
736        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
737               compareRefillPiece );
738
739        for( j = 0; j < poolSize; ++j )
740            pool[j] = p[j].piece;
741
742        tr_free( p );
743    }
744
745    tr_free( peers );
746
747    *pieceCount = poolSize;
748    return pool;
749}
750
751struct tr_blockIterator
752{
753    Torrent * t;
754    tr_block_index_t blockIndex, blockCount, *blocks;
755    tr_piece_index_t pieceIndex, pieceCount, *pieces;
756};
757
758static struct tr_blockIterator*
759blockIteratorNew( Torrent * t )
760{
761    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
762    i->t = t;
763    i->pieces = getPreferredPieces( t, &i->pieceCount );
764    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
765    return i;
766}
767
768static int
769blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
770{
771    int found;
772    Torrent * t = i->t;
773    tr_torrent * tor = t->tor;
774
775    while( ( i->blockIndex == i->blockCount )
776        && ( i->pieceIndex < i->pieceCount ) )
777    {
778        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
779        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
780        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
781        tr_block_index_t block;
782
783        assert( index < tor->info.pieceCount );
784
785        i->blockCount = 0;
786        i->blockIndex = 0;
787        for( block=b; block!=e; ++block )
788            if( !tr_cpBlockIsComplete( tor->completion, block ) )
789                i->blocks[i->blockCount++] = block;
790    }
791
792    if(( found = ( i->blockIndex < i->blockCount )))
793        *setme = i->blocks[i->blockIndex++];
794
795    return found;
796}
797
798static void
799blockIteratorFree( struct tr_blockIterator * i )
800{
801    tr_free( i->blocks );
802    tr_free( i->pieces );
803    tr_free( i );
804}
805
806static tr_peer**
807getPeersUploadingToClient( Torrent * t,
808                           int *     setmeCount )
809{
810    int j;
811    int peerCount = 0;
812    int retCount = 0;
813    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
814    tr_peer ** ret = tr_new( tr_peer *, peerCount );
815
816    j = 0; /* this is a temporary test to make sure we walk through all the peers */
817    if( peerCount )
818    {
819        /* Get a list of peers we're downloading from.
820           Pick a different starting point each time so all peers
821           get a chance at being the first in line */
822        const int fencepost = tr_cryptoWeakRandInt( peerCount );
823        int i = fencepost;
824        do {
825            if( clientIsDownloadingFrom( peers[i] ) )
826                ret[retCount++] = peers[i];
827            i = ( i + 1 ) % peerCount;
828            ++j;
829        } while( i != fencepost );
830    }
831    assert( j == peerCount );
832    *setmeCount = retCount;
833    return ret;
834}
835
836static uint32_t
837getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
838{
839    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
840    const uint64_t blockPos = tor->blockSize * b;
841    assert( blockPos >= piecePos );
842    return (uint32_t)( blockPos - piecePos );
843}
844
845static int
846refillPulse( void * vtorrent )
847{
848    tr_block_index_t block;
849    int peerCount;
850    int webseedCount;
851    tr_peer ** peers;
852    tr_webseed ** webseeds;
853    struct tr_blockIterator * blockIterator;
854    Torrent * t = vtorrent;
855    tr_torrent * tor = t->tor;
856
857    if( !t->isRunning )
858        return TRUE;
859    if( tr_torrentIsSeed( t->tor ) )
860        return TRUE;
861
862    torrentLock( t );
863    tordbg( t, "Refilling Request Buffers..." );
864
865    blockIterator = blockIteratorNew( t );
866    peers = getPeersUploadingToClient( t, &peerCount );
867    webseedCount = tr_ptrArraySize( t->webseeds );
868    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
869                          webseedCount * sizeof( tr_webseed* ) );
870
871    while( ( webseedCount || peerCount )
872        && blockIteratorNext( blockIterator, &block ) )
873    {
874        int j;
875        int handled = FALSE;
876
877        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
878        const uint32_t offset = getBlockOffsetInPiece( tor, block );
879        const uint32_t length = tr_torBlockCountBytes( tor, block );
880
881        /* find a peer who can ask for this block */
882        for( j=0; !handled && j<peerCount; )
883        {
884            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
885                                                   index, offset, length );
886            switch( val )
887            {
888                case TR_ADDREQ_FULL:
889                case TR_ADDREQ_CLIENT_CHOKED:
890                    peers[j] = peers[--peerCount];
891                    break;
892
893                case TR_ADDREQ_MISSING:
894                case TR_ADDREQ_DUPLICATE:
895                    ++j;
896                    break;
897
898                case TR_ADDREQ_OK:
899                    incrementPieceRequests( t, index );
900                    handled = TRUE;
901                    break;
902
903                default:
904                    assert( 0 && "unhandled value" );
905                    break;
906            }
907        }
908
909        /* maybe one of the webseeds can do it */
910        for( j=0; !handled && j<webseedCount; )
911        {
912            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
913                                                          index, offset, length );
914            switch( val )
915            {
916                case TR_ADDREQ_FULL:
917                    webseeds[j] = webseeds[--webseedCount];
918                    break;
919
920                case TR_ADDREQ_OK:
921                    incrementPieceRequests( t, index );
922                    handled = TRUE;
923                    break;
924
925                default:
926                    assert( 0 && "unhandled value" );
927                    break;
928            }
929        }
930    }
931
932    /* cleanup */
933    blockIteratorFree( blockIterator );
934    tr_free( webseeds );
935    tr_free( peers );
936
937    t->refillTimer = NULL;
938    torrentUnlock( t );
939    return FALSE;
940}
941
942static void
943broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
944{
945    int i, size;
946    tr_peer ** peers;
947
948    assert( torrentIsLocked( t ) );
949
950    peers = getConnectedPeers( t, &size );
951    for( i=0; i<size; ++i )
952        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
953    tr_free( peers );
954}
955
956static void
957addStrike( Torrent * t,
958           tr_peer * peer )
959{
960    tordbg( t, "increasing peer %s strike count to %d",
961            tr_peerIoAddrStr( &peer->in_addr,
962                              peer->port ), peer->strikes + 1 );
963
964    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
965    {
966        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
967        atom->myflags |= MYFLAG_BANNED;
968        peer->doPurge = 1;
969        tordbg( t, "banning peer %s",
970               tr_peerIoAddrStr( &atom->addr, atom->port ) );
971    }
972}
973
974static void
975gotBadPiece( Torrent *        t,
976             tr_piece_index_t pieceIndex )
977{
978    tr_torrent *   tor = t->tor;
979    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
980
981    tor->corruptCur += byteCount;
982    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
983}
984
985static void
986refillSoon( Torrent * t )
987{
988    if( t->refillTimer == NULL )
989        t->refillTimer = tr_timerNew( t->manager->session,
990                                      refillPulse, t,
991                                      REFILL_PERIOD_MSEC );
992}
993
994static void
995peerCallbackFunc( void * vpeer,
996                  void * vevent,
997                  void * vt )
998{
999    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
1000    Torrent *             t = (Torrent *) vt;
1001    const tr_peer_event * e = vevent;
1002
1003    torrentLock( t );
1004
1005    switch( e->eventType )
1006    {
1007        case TR_PEER_NEED_REQ:
1008            refillSoon( t );
1009            break;
1010
1011        case TR_PEER_CANCEL:
1012            decrementPieceRequests( t, e->pieceIndex );
1013            break;
1014
1015        case TR_PEER_PEER_GOT_DATA:
1016        {
1017            const time_t now = time( NULL );
1018            tr_torrent * tor = t->tor;
1019            tor->activityDate = now;
1020            tor->uploadedCur += e->length;
1021            if( peer )
1022                tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1023            tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1024            tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1025            tr_statsAddUploaded( tor->session, e->length );
1026            if( peer )
1027            {
1028                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1029                a->piece_data_time = now;
1030            }
1031            break;
1032        }
1033
1034        case TR_PEER_CLIENT_GOT_DATA:
1035        {
1036            const time_t now = time( NULL );
1037            tr_torrent * tor = t->tor;
1038            tor->activityDate = now;
1039            tr_statsAddDownloaded( tor->session, e->length );
1040            if( peer )
1041                tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1042            tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1043            tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1044            /* only add this to downloadedCur if we got it from a peer --
1045             * webseeds shouldn't count against our ratio.  As one tracker
1046             * admin put it, "Those pieces are downloaded directly from the
1047             * content distributor, not the peers, it is the tracker's job
1048             * to manage the swarms, not the web server and does not fit
1049             * into the jurisdiction of the tracker." */
1050            if( peer )
1051                tor->downloadedCur += e->length;
1052            if( peer ) {
1053                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1054                a->piece_data_time = now;
1055            }
1056            break;
1057        }
1058
1059        case TR_PEER_PEER_PROGRESS:
1060        {
1061            if( peer )
1062            {
1063                struct peer_atom * atom = getExistingAtom( t,
1064                                                           &peer->in_addr );
1065                const int          peerIsSeed = e->progress >= 1.0;
1066                if( peerIsSeed )
1067                {
1068                    tordbg( t, "marking peer %s as a seed",
1069                           tr_peerIoAddrStr( &atom->addr,
1070                                             atom->port ) );
1071                    atom->flags |= ADDED_F_SEED_FLAG;
1072                }
1073                else
1074                {
1075                    tordbg( t, "marking peer %s as a non-seed",
1076                           tr_peerIoAddrStr( &atom->addr,
1077                                             atom->port ) );
1078                    atom->flags &= ~ADDED_F_SEED_FLAG;
1079                }
1080            }
1081            break;
1082        }
1083
1084        case TR_PEER_CLIENT_GOT_BLOCK:
1085        {
1086            tr_torrent *     tor = t->tor;
1087
1088            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1089                                                e->offset );
1090
1091            tr_cpBlockAdd( tor->completion, block );
1092            decrementPieceRequests( t, e->pieceIndex );
1093
1094            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1095
1096            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1097            {
1098                const tr_piece_index_t p = e->pieceIndex;
1099                const int              ok = tr_ioTestPiece( tor, p );
1100
1101                if( !ok )
1102                {
1103                    tr_torerr( tor,
1104                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1105                              (unsigned long)p );
1106                }
1107
1108                tr_torrentSetHasPiece( tor, p, ok );
1109                tr_torrentSetPieceChecked( tor, p, TRUE );
1110                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1111
1112                if( !ok )
1113                    gotBadPiece( t, p );
1114                else
1115                {
1116                    int        i, peerCount;
1117                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1118                    for( i = 0; i < peerCount; ++i )
1119                        tr_peerMsgsHave( peers[i]->msgs, p );
1120                    tr_free( peers );
1121                }
1122
1123                tr_torrentRecheckCompleteness( tor );
1124            }
1125            break;
1126        }
1127
1128        case TR_PEER_ERROR:
1129            if( e->err == EINVAL )
1130            {
1131                addStrike( t, peer );
1132                peer->doPurge = 1;
1133            }
1134            else if( ( e->err == ERANGE )
1135                  || ( e->err == EMSGSIZE )
1136                  || ( e->err == ENOTCONN ) )
1137            {
1138                /* some protocol error from the peer */
1139                peer->doPurge = 1;
1140            }
1141            else /* a local error, such as an IO error */
1142            {
1143                t->tor->error = e->err;
1144                tr_strlcpy( t->tor->errorString,
1145                            tr_strerror( t->tor->error ),
1146                            sizeof( t->tor->errorString ) );
1147                tr_torrentStop( t->tor );
1148            }
1149            break;
1150
1151        default:
1152            assert( 0 );
1153    }
1154
1155    torrentUnlock( t );
1156}
1157
1158static void
1159ensureAtomExists( Torrent *              t,
1160                  const struct in_addr * addr,
1161                  uint16_t               port,
1162                  uint8_t                flags,
1163                  uint8_t                from )
1164{
1165    if( getExistingAtom( t, addr ) == NULL )
1166    {
1167        struct peer_atom * a;
1168        a = tr_new0( struct peer_atom, 1 );
1169        a->addr = *addr;
1170        a->port = port;
1171        a->flags = flags;
1172        a->from = from;
1173        tordbg( t, "got a new atom: %s",
1174               tr_peerIoAddrStr( &a->addr, a->port ) );
1175        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1176    }
1177}
1178
1179static int
1180getMaxPeerCount( const tr_torrent * tor )
1181{
1182    return tor->maxConnectedPeers;
1183}
1184
1185static int
1186getPeerCount( const Torrent * t )
1187{
1188    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1189               t->outgoingHandshakes );
1190}
1191
1192/* FIXME: this is kind of a mess. */
1193static int
1194myHandshakeDoneCB( tr_handshake *  handshake,
1195                   tr_peerIo *     io,
1196                   int             isConnected,
1197                   const uint8_t * peer_id,
1198                   void *          vmanager )
1199{
1200    int                    ok = isConnected;
1201    int                    success = FALSE;
1202    uint16_t               port;
1203    const struct in_addr * addr;
1204    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1205    Torrent *              t;
1206    tr_handshake *         ours;
1207
1208    assert( io );
1209    assert( isConnected == 0 || isConnected == 1 );
1210
1211    t = tr_peerIoHasTorrentHash( io )
1212        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1213        : NULL;
1214
1215    if( tr_peerIoIsIncoming ( io ) )
1216        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1217                                        handshake, handshakeCompare );
1218    else if( t )
1219        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1220                                        handshake, handshakeCompare );
1221    else
1222        ours = handshake;
1223
1224    assert( ours );
1225    assert( ours == handshake );
1226
1227    if( t )
1228        torrentLock( t );
1229
1230    addr = tr_peerIoGetAddress( io, &port );
1231
1232    if( !ok || !t || !t->isRunning )
1233    {
1234        if( t )
1235        {
1236            struct peer_atom * atom = getExistingAtom( t, addr );
1237            if( atom )
1238                ++atom->numFails;
1239        }
1240
1241        tr_peerIoFree( io );
1242    }
1243    else /* looking good */
1244    {
1245        struct peer_atom * atom;
1246        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1247        atom = getExistingAtom( t, addr );
1248        atom->time = time( NULL );
1249
1250        if( atom->myflags & MYFLAG_BANNED )
1251        {
1252            tordbg( t, "banned peer %s tried to reconnect",
1253                   tr_peerIoAddrStr( &atom->addr,
1254                                     atom->port ) );
1255            tr_peerIoFree( io );
1256        }
1257        else if( tr_peerIoIsIncoming( io )
1258               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1259
1260        {
1261            tr_peerIoFree( io );
1262        }
1263        else
1264        {
1265            tr_peer * peer = getExistingPeer( t, addr );
1266
1267            if( peer ) /* we already have this peer */
1268            {
1269                tr_peerIoFree( io );
1270            }
1271            else
1272            {
1273                peer = getPeer( t, addr );
1274                tr_free( peer->client );
1275
1276                if( !peer_id )
1277                    peer->client = NULL;
1278                else
1279                {
1280                    char client[128];
1281                    tr_clientForId( client, sizeof( client ), peer_id );
1282                    peer->client = tr_strdup( client );
1283                }
1284                peer->port = port;
1285                peer->io = io;
1286                peer->msgs =
1287                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1288                                    &peer->msgsTag );
1289
1290                success = TRUE;
1291            }
1292        }
1293    }
1294
1295    if( t )
1296        torrentUnlock( t );
1297
1298    return success;
1299}
1300
1301void
1302tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1303                       struct in_addr * addr,
1304                       uint16_t         port,
1305                       int              socket )
1306{
1307    managerLock( manager );
1308
1309    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1310    {
1311        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1312               inet_ntoa( *addr ) );
1313        tr_netClose( socket );
1314    }
1315    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1316    {
1317        tr_netClose( socket );
1318    }
1319    else /* we don't have a connetion to them yet... */
1320    {
1321        tr_peerIo *    io;
1322        tr_handshake * handshake;
1323
1324        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1325
1326        handshake = tr_handshakeNew( io,
1327                                     manager->session->encryptionMode,
1328                                     myHandshakeDoneCB,
1329                                     manager );
1330
1331        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1332                                 handshakeCompare );
1333    }
1334
1335    managerUnlock( manager );
1336}
1337
1338void
1339tr_peerMgrAddPex( tr_peerMgr *    manager,
1340                  const uint8_t * torrentHash,
1341                  uint8_t         from,
1342                  const tr_pex *  pex )
1343{
1344    Torrent * t;
1345
1346    managerLock( manager );
1347
1348    t = getExistingTorrent( manager, torrentHash );
1349    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1350        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1351
1352    managerUnlock( manager );
1353}
1354
1355tr_pex *
1356tr_peerMgrCompactToPex( const void *    compact,
1357                        size_t          compactLen,
1358                        const uint8_t * added_f,
1359                        size_t          added_f_len,
1360                        size_t *        pexCount )
1361{
1362    size_t          i;
1363    size_t          n = compactLen / 6;
1364    const uint8_t * walk = compact;
1365    tr_pex *        pex = tr_new0( tr_pex, n );
1366
1367    for( i = 0; i < n; ++i )
1368    {
1369        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1370        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1371        if( added_f && ( n == added_f_len ) )
1372            pex[i].flags = added_f[i];
1373    }
1374
1375    *pexCount = n;
1376    return pex;
1377}
1378
1379/**
1380***
1381**/
1382
1383void
1384tr_peerMgrSetBlame( tr_peerMgr *     manager,
1385                    const uint8_t *  torrentHash,
1386                    tr_piece_index_t pieceIndex,
1387                    int              success )
1388{
1389    if( !success )
1390    {
1391        int        peerCount, i;
1392        Torrent *  t = getExistingTorrent( manager, torrentHash );
1393        tr_peer ** peers;
1394
1395        assert( torrentIsLocked( t ) );
1396
1397        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1398        for( i = 0; i < peerCount; ++i )
1399        {
1400            tr_peer * peer = peers[i];
1401            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1402            {
1403                tordbg(
1404                    t,
1405                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1406                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1407                    pieceIndex, (int)peer->strikes + 1 );
1408                addStrike( t, peer );
1409            }
1410        }
1411    }
1412}
1413
1414int
1415tr_pexCompare( const void * va,
1416               const void * vb )
1417{
1418    const tr_pex * a = va;
1419    const tr_pex * b = vb;
1420    int            i =
1421        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1422
1423    if( i ) return i;
1424    if( a->port < b->port ) return -1;
1425    if( a->port > b->port ) return 1;
1426    return 0;
1427}
1428
1429int tr_pexCompare( const void * a,
1430                   const void * b );
1431
1432static int
1433peerPrefersCrypto( const tr_peer * peer )
1434{
1435    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1436        return TRUE;
1437
1438    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1439        return FALSE;
1440
1441    return tr_peerIoIsEncrypted( peer->io );
1442}
1443
1444int
1445tr_peerMgrGetPeers( tr_peerMgr *    manager,
1446                    const uint8_t * torrentHash,
1447                    tr_pex **       setme_pex )
1448{
1449    int peerCount = 0;
1450    const Torrent *  t;
1451
1452    managerLock( manager );
1453
1454    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1455    if( !t )
1456    {
1457        *setme_pex = NULL;
1458    }
1459    else
1460    {
1461        int i;
1462        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1463        tr_pex * pex = tr_new( tr_pex, peerCount );
1464        tr_pex * walk = pex;
1465
1466        for( i = 0; i < peerCount; ++i, ++walk )
1467        {
1468            const tr_peer * peer = peers[i];
1469            walk->in_addr = peer->in_addr;
1470            walk->port = peer->port;
1471            walk->flags = 0;
1472            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1473            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1474        }
1475
1476        assert( ( walk - pex ) == peerCount );
1477        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1478        *setme_pex = pex;
1479    }
1480
1481    managerUnlock( manager );
1482    return peerCount;
1483}
1484
1485static int reconnectPulse( void * vtorrent );
1486
1487static int rechokePulse( void * vtorrent );
1488
1489void
1490tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1491                        const uint8_t * torrentHash )
1492{
1493    Torrent * t;
1494
1495    managerLock( manager );
1496
1497    t = getExistingTorrent( manager, torrentHash );
1498
1499    assert( t );
1500    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1501    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1502
1503    if( !t->isRunning )
1504    {
1505        t->isRunning = 1;
1506
1507        t->reconnectTimer = tr_timerNew( t->manager->session,
1508                                         reconnectPulse, t,
1509                                         RECONNECT_PERIOD_MSEC );
1510
1511        t->rechokeTimer = tr_timerNew( t->manager->session,
1512                                       rechokePulse, t,
1513                                       RECHOKE_PERIOD_MSEC );
1514
1515        reconnectPulse( t );
1516
1517        rechokePulse( t );
1518
1519        if( !tr_ptrArrayEmpty( t->webseeds ) )
1520            refillSoon( t );
1521    }
1522
1523    managerUnlock( manager );
1524}
1525
1526static void
1527stopTorrent( Torrent * t )
1528{
1529    assert( torrentIsLocked( t ) );
1530
1531    t->isRunning = 0;
1532    tr_timerFree( &t->rechokeTimer );
1533    tr_timerFree( &t->reconnectTimer );
1534
1535    /* disconnect the peers. */
1536    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1537    tr_ptrArrayClear( t->peers );
1538
1539    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1540     * which removes the handshake from t->outgoingHandshakes... */
1541    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1542        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1543}
1544
1545void
1546tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1547                       const uint8_t * torrentHash )
1548{
1549    managerLock( manager );
1550
1551    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1552
1553    managerUnlock( manager );
1554}
1555
1556void
1557tr_peerMgrAddTorrent( tr_peerMgr * manager,
1558                      tr_torrent * tor )
1559{
1560    Torrent * t;
1561
1562    managerLock( manager );
1563
1564    assert( tor );
1565    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1566
1567    t = torrentConstructor( manager, tor );
1568    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1569
1570    managerUnlock( manager );
1571}
1572
1573void
1574tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1575                         const uint8_t * torrentHash )
1576{
1577    Torrent * t;
1578
1579    managerLock( manager );
1580
1581    t = getExistingTorrent( manager, torrentHash );
1582    assert( t );
1583    stopTorrent( t );
1584    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1585    torrentDestructor( t );
1586
1587    managerUnlock( manager );
1588}
1589
1590void
1591tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1592                               const uint8_t *    torrentHash,
1593                               int8_t *           tab,
1594                               unsigned int       tabCount )
1595{
1596    tr_piece_index_t   i;
1597    const Torrent *    t;
1598    const tr_torrent * tor;
1599    float              interval;
1600    int                isComplete;
1601    int                peerCount;
1602    const tr_peer **   peers;
1603
1604    managerLock( manager );
1605
1606    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1607    tor = t->tor;
1608    interval = tor->info.pieceCount / (float)tabCount;
1609    isComplete = tor
1610                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1611    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1612
1613    memset( tab, 0, tabCount );
1614
1615    for( i = 0; tor && i < tabCount; ++i )
1616    {
1617        const int piece = i * interval;
1618
1619        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1620            tab[i] = -1;
1621        else if( peerCount )
1622        {
1623            int j;
1624            for( j = 0; j < peerCount; ++j )
1625                if( tr_bitfieldHas( peers[j]->have, i ) )
1626                    ++tab[i];
1627        }
1628    }
1629
1630    managerUnlock( manager );
1631}
1632
1633/* Returns the pieces that are available from peers */
1634tr_bitfield*
1635tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1636                        const uint8_t *    torrentHash )
1637{
1638    int           i, size;
1639    Torrent *     t;
1640    tr_peer **    peers;
1641    tr_bitfield * pieces;
1642
1643    managerLock( manager );
1644
1645    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1646    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1647    peers = getConnectedPeers( t, &size );
1648    for( i = 0; i < size; ++i )
1649        tr_bitfieldOr( pieces, peers[i]->have );
1650
1651    managerUnlock( manager );
1652    tr_free( peers );
1653    return pieces;
1654}
1655
1656int
1657tr_peerMgrHasConnections( const tr_peerMgr * manager,
1658                          const uint8_t *    torrentHash )
1659{
1660    int             ret;
1661    const Torrent * t;
1662
1663    managerLock( manager );
1664
1665    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1666    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1667               || !tr_ptrArrayEmpty( t->webseeds ) );
1668
1669    managerUnlock( manager );
1670    return ret;
1671}
1672
1673void
1674tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1675                        const uint8_t *    torrentHash,
1676                        int *              setmePeersKnown,
1677                        int *              setmePeersConnected,
1678                        int *              setmeSeedsConnected,
1679                        int *              setmeWebseedsSendingToUs,
1680                        int *              setmePeersSendingToUs,
1681                        int *              setmePeersGettingFromUs,
1682                        int *              setmePeersFrom )
1683{
1684    int                 i, size;
1685    const Torrent *     t;
1686    const tr_peer **    peers;
1687    const tr_webseed ** webseeds;
1688
1689    managerLock( manager );
1690
1691    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1692    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1693
1694    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1695    *setmePeersConnected       = 0;
1696    *setmeSeedsConnected       = 0;
1697    *setmePeersGettingFromUs   = 0;
1698    *setmePeersSendingToUs     = 0;
1699    *setmeWebseedsSendingToUs  = 0;
1700
1701    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1702        setmePeersFrom[i] = 0;
1703
1704    for( i = 0; i < size; ++i )
1705    {
1706        const tr_peer *          peer = peers[i];
1707        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1708
1709        if( peer->io == NULL ) /* not connected */
1710            continue;
1711
1712        ++ * setmePeersConnected;
1713
1714        ++setmePeersFrom[atom->from];
1715
1716        if( clientIsDownloadingFrom( peer ) )
1717            ++ * setmePeersSendingToUs;
1718
1719        if( clientIsUploadingTo( peer ) )
1720            ++ * setmePeersGettingFromUs;
1721
1722        if( atom->flags & ADDED_F_SEED_FLAG )
1723            ++ * setmeSeedsConnected;
1724    }
1725
1726    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1727    for( i = 0; i < size; ++i )
1728    {
1729        if( tr_webseedIsActive( webseeds[i] ) )
1730            ++ * setmeWebseedsSendingToUs;
1731    }
1732
1733    managerUnlock( manager );
1734}
1735
1736float*
1737tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1738                     const uint8_t *    torrentHash )
1739{
1740    const Torrent *     t;
1741    const tr_webseed ** webseeds;
1742    int                 i;
1743    int                 webseedCount;
1744    float *             ret;
1745
1746    assert( manager );
1747    managerLock( manager );
1748
1749    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1750    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1751                                                     &webseedCount );
1752    assert( webseedCount == t->tor->info.webseedCount );
1753    ret = tr_new0( float, webseedCount );
1754
1755    for( i = 0; i < webseedCount; ++i )
1756        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1757            ret[i] = -1.0;
1758
1759    managerUnlock( manager );
1760    return ret;
1761}
1762
1763double
1764tr_peerGetPieceSpeed( const tr_peer    * peer,
1765                      tr_direction       direction )
1766{
1767    assert( peer );
1768    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1769
1770    return tr_rcRate( peer->pieceSpeed[direction] );
1771}
1772
1773
1774struct tr_peer_stat *
1775tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1776                     const   uint8_t     * torrentHash,
1777                     int                 * setmeCount UNUSED )
1778{
1779    int             i, size;
1780    const Torrent * t;
1781    tr_peer **      peers;
1782    tr_peer_stat *  ret;
1783
1784    assert( manager );
1785    managerLock( manager );
1786
1787    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1788    peers = getConnectedPeers( (Torrent*)t, &size );
1789    ret = tr_new0( tr_peer_stat, size );
1790
1791    for( i = 0; i < size; ++i )
1792    {
1793        char *                   pch;
1794        const tr_peer *          peer = peers[i];
1795        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1796        tr_peer_stat *           stat = ret + i;
1797
1798        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1799        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1800                   sizeof( stat->client ) );
1801        stat->port               = peer->port;
1802        stat->from               = atom->from;
1803        stat->progress           = peer->progress;
1804        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1805        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1806        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1807        stat->peerIsChoked       = peer->peerIsChoked;
1808        stat->peerIsInterested   = peer->peerIsInterested;
1809        stat->clientIsChoked     = peer->clientIsChoked;
1810        stat->clientIsInterested = peer->clientIsInterested;
1811        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1812        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1813        stat->isUploadingTo      = clientIsUploadingTo( peer );
1814
1815        pch = stat->flagStr;
1816        if( t->optimistic == peer ) *pch++ = 'O';
1817        if( stat->isDownloadingFrom ) *pch++ = 'D';
1818        else if( stat->clientIsInterested ) *pch++ = 'd';
1819        if( stat->isUploadingTo ) *pch++ = 'U';
1820        else if( stat->peerIsInterested ) *pch++ = 'u';
1821        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1822                'K';
1823        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1824        if( stat->isEncrypted ) *pch++ = 'E';
1825        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1826        if( stat->isIncoming ) *pch++ = 'I';
1827        *pch = '\0';
1828    }
1829
1830    *setmeCount = size;
1831    tr_free( peers );
1832
1833    managerUnlock( manager );
1834    return ret;
1835}
1836
1837/**
1838***
1839**/
1840
1841struct ChokeData
1842{
1843    unsigned int    doUnchoke    : 1;
1844    unsigned int    isInterested : 1;
1845    double          rate;
1846    tr_peer *       peer;
1847};
1848
1849static int
1850compareChoke( const void * va,
1851              const void * vb )
1852{
1853    const struct ChokeData * a = va;
1854    const struct ChokeData * b = vb;
1855    int                      diff = 0;
1856
1857    if( diff == 0 ) /* prefer higher overall speeds */
1858        diff = a->rate > b->rate ? -1 : 1;
1859
1860    if( diff == 0 ) /* prefer unchoked */
1861        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1862
1863    return diff;
1864}
1865
1866static int
1867isNew( const tr_peer * peer )
1868{
1869    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1870}
1871
1872static int
1873isSame( const tr_peer * peer )
1874{
1875    return peer && peer->client && strstr( peer->client, "Transmission" );
1876}
1877
1878/**
1879***
1880**/
1881
1882static void
1883rechoke( Torrent * t )
1884{
1885    int                i, peerCount, size, unchokedInterested;
1886    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1887    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1888    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1889
1890    assert( torrentIsLocked( t ) );
1891
1892    /* sort the peers by preference and rate */
1893    for( i = 0, size = 0; i < peerCount; ++i )
1894    {
1895        tr_peer * peer = peers[i];
1896        if( peer->progress >= 1.0 ) /* choke all seeds */
1897            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1898        else if( chokeAll )
1899            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1900        else {
1901            struct ChokeData * n = &choke[size++];
1902            n->peer         = peer;
1903            n->isInterested = peer->peerIsInterested;
1904            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER )
1905                            + tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1906        }
1907    }
1908
1909    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1910
1911    /**
1912     * Reciprocation and number of uploads capping is managed by unchoking
1913     * the N peers which have the best upload rate and are interested.
1914     * This maximizes the client's download rate. These N peers are
1915     * referred to as downloaders, because they are interested in downloading
1916     * from the client.
1917     *
1918     * Peers which have a better upload rate (as compared to the downloaders)
1919     * but aren't interested get unchoked. If they become interested, the
1920     * downloader with the worst upload rate gets choked. If a client has
1921     * a complete file, it uses its upload rate rather than its download
1922     * rate to decide which peers to unchoke.
1923     */
1924    unchokedInterested = 0;
1925    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1926    {
1927        choke[i].doUnchoke = 1;
1928        if( choke[i].isInterested )
1929            ++unchokedInterested;
1930    }
1931
1932    /* optimistic unchoke */
1933    if( i < size )
1934    {
1935        int                n;
1936        struct ChokeData * c;
1937        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1938
1939        for( ; i < size; ++i )
1940        {
1941            if( choke[i].isInterested )
1942            {
1943                const tr_peer * peer = choke[i].peer;
1944                int             x = 1, y;
1945                if( isNew( peer ) ) x *= 3;
1946                if( isSame( peer ) ) x *= 3;
1947                for( y = 0; y < x; ++y )
1948                    tr_ptrArrayAppend( randPool, &choke[i] );
1949            }
1950        }
1951
1952        if( ( n = tr_ptrArraySize( randPool ) ) )
1953        {
1954            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1955            c->doUnchoke = 1;
1956            t->optimistic = c->peer;
1957        }
1958
1959        tr_ptrArrayFree( randPool, NULL );
1960    }
1961
1962    for( i = 0; i < size; ++i )
1963        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1964
1965    /* cleanup */
1966    tr_free( choke );
1967    tr_free( peers );
1968}
1969
1970static int
1971rechokePulse( void * vtorrent )
1972{
1973    Torrent * t = vtorrent;
1974
1975    torrentLock( t );
1976    rechoke( t );
1977    torrentUnlock( t );
1978    return TRUE;
1979}
1980
1981/***
1982****
1983****  Life and Death
1984****
1985***/
1986
1987static int
1988shouldPeerBeClosed( const Torrent * t,
1989                    const tr_peer * peer,
1990                    int             peerCount )
1991{
1992    const tr_torrent *       tor = t->tor;
1993    const time_t             now = time( NULL );
1994    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1995
1996    /* if it's marked for purging, close it */
1997    if( peer->doPurge )
1998    {
1999        tordbg( t, "purging peer %s because its doPurge flag is set",
2000               tr_peerIoAddrStr( &atom->addr,
2001                                 atom->port ) );
2002        return TRUE;
2003    }
2004
2005    /* if we're seeding and the peer has everything we have,
2006     * and enough time has passed for a pex exchange, then disconnect */
2007    if( tr_torrentIsSeed( tor ) )
2008    {
2009        int peerHasEverything;
2010        if( atom->flags & ADDED_F_SEED_FLAG )
2011            peerHasEverything = TRUE;
2012        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2013            peerHasEverything = FALSE;
2014        else
2015        {
2016            tr_bitfield * tmp =
2017                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2018            tr_bitfieldDifference( tmp, peer->have );
2019            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2020            tr_bitfieldFree( tmp );
2021        }
2022        if( peerHasEverything
2023          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2024        {
2025            tordbg( t, "purging peer %s because we're both seeds",
2026                   tr_peerIoAddrStr( &atom->addr,
2027                                     atom->port ) );
2028            return TRUE;
2029        }
2030    }
2031
2032    /* disconnect if it's been too long since piece data has been transferred.
2033     * this is on a sliding scale based on number of available peers... */
2034    {
2035        const int    relaxStrictnessIfFewerThanN =
2036            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2037        /* if we have >= relaxIfFewerThan, strictness is 100%.
2038         * if we have zero connections, strictness is 0% */
2039        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2040                                  ? 1.0
2041                                  : peerCount /
2042                                  (float)relaxStrictnessIfFewerThanN;
2043        const int    lo = MIN_UPLOAD_IDLE_SECS;
2044        const int    hi = MAX_UPLOAD_IDLE_SECS;
2045        const int    limit = lo + ( ( hi - lo ) * strictness );
2046        const time_t then = peer->pieceDataActivityDate;
2047        const int    idleTime = then ? ( now - then ) : 0;
2048        if( idleTime > limit )
2049        {
2050            tordbg(
2051                t,
2052                "purging peer %s because it's been %d secs since we shared anything",
2053                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2054            return TRUE;
2055        }
2056    }
2057
2058    return FALSE;
2059}
2060
2061static tr_peer **
2062getPeersToClose( Torrent * t,
2063                 int *     setmeSize )
2064{
2065    int               i, peerCount, outsize;
2066    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2067                                                           &peerCount );
2068    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2069
2070    assert( torrentIsLocked( t ) );
2071
2072    for( i = outsize = 0; i < peerCount; ++i )
2073        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2074            ret[outsize++] = peers[i];
2075
2076    *setmeSize = outsize;
2077    return ret;
2078}
2079
2080static int
2081compareCandidates( const void * va,
2082                   const void * vb )
2083{
2084    const struct peer_atom * a = *(const struct peer_atom**) va;
2085    const struct peer_atom * b = *(const struct peer_atom**) vb;
2086
2087    /* <Charles> Here we would probably want to try reconnecting to
2088     * peers that had most recently given us data. Lots of users have
2089     * trouble with resets due to their routers and/or ISPs. This way we
2090     * can quickly recover from an unwanted reset. So we sort
2091     * piece_data_time in descending order.
2092     */
2093
2094    if( a->piece_data_time != b->piece_data_time )
2095        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2096
2097    if( a->numFails != b->numFails )
2098        return a->numFails < b->numFails ? -1 : 1;
2099
2100    if( a->time != b->time )
2101        return a->time < b->time ? -1 : 1;
2102
2103    return 0;
2104}
2105
2106static int
2107getReconnectIntervalSecs( const struct peer_atom * atom )
2108{
2109    int          sec;
2110    const time_t now = time( NULL );
2111
2112    /* if we were recently connected to this peer and transferring piece
2113     * data, try to reconnect to them sooner rather that later -- we don't
2114     * want network troubles to get in the way of a good peer. */
2115    if( ( now - atom->piece_data_time ) <=
2116       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2117        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2118
2119    /* don't allow reconnects more often than our minimum */
2120    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2121        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2122
2123    /* otherwise, the interval depends on how many times we've tried
2124     * and failed to connect to the peer */
2125    else switch( atom->numFails )
2126        {
2127            case 0:
2128                sec = 0; break;
2129
2130            case 1:
2131                sec = 5; break;
2132
2133            case 2:
2134                sec = 2 * 60; break;
2135
2136            case 3:
2137                sec = 15 * 60; break;
2138
2139            case 4:
2140                sec = 30 * 60; break;
2141
2142            case 5:
2143                sec = 60 * 60; break;
2144
2145            default:
2146                sec = 120 * 60; break;
2147        }
2148
2149    return sec;
2150}
2151
2152static struct peer_atom **
2153getPeerCandidates(                               Torrent * t,
2154                                           int * setmeSize )
2155{
2156    int                 i, atomCount, retCount;
2157    struct peer_atom ** atoms;
2158    struct peer_atom ** ret;
2159    const time_t        now = time( NULL );
2160    const int           seed = tr_torrentIsSeed( t->tor );
2161
2162    assert( torrentIsLocked( t ) );
2163
2164    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2165    ret = tr_new( struct peer_atom*, atomCount );
2166    for( i = retCount = 0; i < atomCount; ++i )
2167    {
2168        int                interval;
2169        struct peer_atom * atom = atoms[i];
2170
2171        /* peer fed us too much bad data ... we only keep it around
2172         * now to weed it out in case someone sends it to us via pex */
2173        if( atom->myflags & MYFLAG_BANNED )
2174            continue;
2175
2176        /* peer was unconnectable before, so we're not going to keep trying.
2177         * this is needs a separate flag from `banned', since if they try
2178         * to connect to us later, we'll let them in */
2179        if( atom->myflags & MYFLAG_UNREACHABLE )
2180            continue;
2181
2182        /* we don't need two connections to the same peer... */
2183        if( peerIsInUse( t, &atom->addr ) )
2184            continue;
2185
2186        /* no need to connect if we're both seeds... */
2187        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2188            continue;
2189
2190        /* don't reconnect too often */
2191        interval = getReconnectIntervalSecs( atom );
2192        if( ( now - atom->time ) < interval )
2193        {
2194            tordbg(
2195                t,
2196                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2197                i, tr_peerIoAddrStr( &atom->addr,
2198                                     atom->port ), interval );
2199            continue;
2200        }
2201
2202        /* Don't connect to peers in our blocklist */
2203        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2204            continue;
2205
2206        ret[retCount++] = atom;
2207    }
2208
2209    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2210    *setmeSize = retCount;
2211    return ret;
2212}
2213
2214static int
2215reconnectPulse( void * vtorrent )
2216{
2217    Torrent *     t = vtorrent;
2218    static time_t prevTime = 0;
2219    static int    newConnectionsThisSecond = 0;
2220    time_t        now;
2221
2222    torrentLock( t );
2223
2224    now = time( NULL );
2225    if( prevTime != now )
2226    {
2227        prevTime = now;
2228        newConnectionsThisSecond = 0;
2229    }
2230
2231    if( !t->isRunning )
2232    {
2233        removeAllPeers( t );
2234    }
2235    else
2236    {
2237        int                 i, nCandidates, nBad;
2238        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2239        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2240
2241        if( nBad || nCandidates )
2242            tordbg(
2243                t, "reconnect pulse for [%s]: %d bad connections, "
2244                   "%d connection candidates, %d atoms, max per pulse is %d",
2245                t->tor->info.name, nBad, nCandidates,
2246                tr_ptrArraySize( t->pool ),
2247                (int)MAX_RECONNECTIONS_PER_PULSE );
2248
2249        /* disconnect some peers.
2250           if we transferred piece data, then they might be good peers,
2251           so reset their `numFails' weight to zero.  otherwise we connected
2252           to them fruitlessly, so mark it as another fail */
2253        for( i = 0; i < nBad; ++i )
2254        {
2255            tr_peer *          peer = connections[i];
2256            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2257            if( peer->pieceDataActivityDate )
2258                atom->numFails = 0;
2259            else
2260                ++atom->numFails;
2261            tordbg( t, "removing bad peer %s",
2262                   tr_peerIoGetAddrStr( peer->io ) );
2263            removePeer( t, peer );
2264        }
2265
2266        /* add some new ones */
2267        for( i = 0;    ( i < nCandidates )
2268           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2269           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2270           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2271             ++i )
2272        {
2273            tr_peerMgr *       mgr = t->manager;
2274            struct peer_atom * atom = candidates[i];
2275            tr_peerIo *        io;
2276
2277            tordbg( t, "Starting an OUTGOING connection with %s",
2278                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2279
2280            io =
2281                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2282                                      t->hash );
2283            if( io == NULL )
2284            {
2285                atom->myflags |= MYFLAG_UNREACHABLE;
2286            }
2287            else
2288            {
2289                tr_handshake * handshake = tr_handshakeNew(
2290                    io,
2291                    mgr->session->
2292                    encryptionMode,
2293                    myHandshakeDoneCB,
2294                    mgr );
2295
2296                assert( tr_peerIoGetTorrentHash( io ) );
2297
2298                ++newConnectionsThisSecond;
2299
2300                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2301                                         handshakeCompare );
2302            }
2303
2304            atom->time = time( NULL );
2305        }
2306
2307        /* cleanup */
2308        tr_free( connections );
2309        tr_free( candidates );
2310    }
2311
2312    torrentUnlock( t );
2313    return TRUE;
2314}
2315
2316/****
2317*****
2318*****  BANDWIDTH ALLOCATION
2319*****
2320****/
2321
2322static double
2323allocateHowMuch( double                  desired_average_kb_per_sec,
2324                 const tr_ratecontrol  * ratecontrol )
2325{
2326    const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
2327    const double seconds_per_pulse = BANDWIDTH_PERIOD_MSEC / 1000.0;
2328    const double baseline_bytes_per_pulse = desired_average_kb_per_sec * 1024.0 * seconds_per_pulse;
2329    const double min = baseline_bytes_per_pulse * 0.80;
2330    const double max = baseline_bytes_per_pulse * 1.10;
2331    const double current_bytes_per_pulse = tr_rcRate( ratecontrol ) * 1024.0 * seconds_per_pulse;
2332    const double next_pulse_bytes = baseline_bytes_per_pulse * ( pulses_per_history + 1 )
2333                                  - ( current_bytes_per_pulse * pulses_per_history );
2334    double clamped;
2335
2336    /* clamp the return value to lessen oscillation */
2337    clamped = next_pulse_bytes;
2338    clamped = MAX( clamped, min );
2339    clamped = MIN( clamped, max );
2340
2341#if 0
2342fprintf( stderr, "desiredAvgKB is %5.2f, rate is %5.2f, allocating %5.2f (%5.2f)\n",
2343         desired_average_kb_per_sec,
2344         tr_rcRate( ratecontrol ),
2345         clamped/1024.0,
2346         next_pulse_bytes/1024.0 );
2347#endif
2348
2349    return clamped;
2350}
2351
2352/**
2353 * Distributes a fixed amount of bandwidth among a set of peers.
2354 *
2355 * @param peerArray peers whose client-to-peer bandwidth will be set
2356 * @param direction whether to allocate upload or download bandwidth
2357 * @param history recent bandwidth history for these peers
2358 * @param desiredAvgKB overall bandwidth goal for this set of peers
2359 */
2360static void
2361setPeerBandwidth( tr_ptrArray          * peerArray,
2362                  const tr_direction     direction,
2363                  const tr_ratecontrol * ratecontrol,
2364                  double                 desiredAvgKB )
2365{
2366    const int    peerCount = tr_ptrArraySize( peerArray );
2367    const double bytes = allocateHowMuch( desiredAvgKB, ratecontrol );
2368    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2369    const double meritBytes = MAX( 0, bytes - welfareBytes );
2370    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2371    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2372    int          i;
2373    int          candidateCount;
2374    double       welfare;
2375    size_t       bytesUsed;
2376
2377    assert( meritBytes >= 0.0 );
2378    assert( welfareBytes >= 0.0 );
2379    assert( direction == TR_UP || direction == TR_DOWN );
2380
2381    for( i = candidateCount = 0; i < peerCount; ++i )
2382        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2383            candidates[candidateCount++] = peers[i];
2384        else
2385            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2386
2387    for( i = bytesUsed = 0; i < candidateCount; ++i )
2388        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2389                                                direction );
2390
2391    welfare = welfareBytes / candidateCount;
2392
2393    for( i = 0; i < candidateCount; ++i )
2394    {
2395        tr_peer *    peer = candidates[i];
2396        const double merit = bytesUsed
2397                             ? ( meritBytes *
2398                                tr_peerIoGetBandwidthUsed( peer->io,
2399                                                           direction ) ) /
2400                             bytesUsed
2401                             : ( meritBytes / candidateCount );
2402        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2403    }
2404
2405    /* cleanup */
2406    tr_free( candidates );
2407}
2408
2409static size_t
2410countHandshakeBandwidth( tr_ptrArray * handshakes,
2411                         tr_direction  direction )
2412{
2413    const int n = tr_ptrArraySize( handshakes );
2414    int       i;
2415    size_t    total;
2416
2417    for( i = total = 0; i < n; ++i )
2418    {
2419        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2420        total += tr_peerIoGetBandwidthUsed( io, direction );
2421    }
2422    return total;
2423}
2424
2425static size_t
2426countPeerBandwidth( tr_ptrArray * peers,
2427                    tr_direction  direction )
2428{
2429    const int n = tr_ptrArraySize( peers );
2430    int       i;
2431    size_t    total;
2432
2433    for( i = total = 0; i < n; ++i )
2434    {
2435        tr_peer * peer = tr_ptrArrayNth( peers, i );
2436        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2437    }
2438    return total;
2439}
2440
2441static void
2442givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2443                             tr_direction  direction )
2444{
2445    const int n = tr_ptrArraySize( peers );
2446    int       i;
2447
2448    for( i = 0; i < n; ++i )
2449    {
2450        tr_peer * peer = tr_ptrArrayNth( peers, i );
2451        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2452    }
2453}
2454
2455static void
2456pumpAllPeers( tr_peerMgr * mgr )
2457{
2458    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2459    int       i, j;
2460
2461    for( i = 0; i < torrentCount; ++i )
2462    {
2463        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2464        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2465        {
2466            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2467            tr_peerMsgsPulse( peer->msgs );
2468        }
2469    }
2470}
2471
2472/**
2473 * Allocate bandwidth for each peer connection.
2474 *
2475 * @param mgr the peer manager
2476 * @param direction whether to allocate upload or download bandwidth
2477 * @return the amount of directional bandwidth used since the last pulse.
2478 */
2479static double
2480allocateBandwidth( tr_peerMgr * mgr,
2481                   tr_direction direction )
2482{
2483    tr_session *  session = mgr->session;
2484    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2485    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2486    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2487    double        allBytesUsed = 0;
2488    size_t        poolBytesUsed = 0;
2489    int           i;
2490
2491    assert( mgr );
2492    assert( direction == TR_UP || direction == TR_DOWN );
2493
2494    /* before allocating bandwidth, pump the connected peers */
2495    pumpAllPeers( mgr );
2496
2497    for( i=0; i<torrentCount; ++i )
2498    {
2499        Torrent * t = torrents[i];
2500        size_t used;
2501        tr_speedlimit speedMode;
2502
2503        /* no point in allocating bandwidth for stopped torrents */
2504        if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
2505            continue;
2506
2507        used = countPeerBandwidth( t->peers, direction );
2508        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2509
2510        /* remember this torrent's bytes used */
2511        tr_rcTransferred( t->tor->rawSpeed[direction], used );
2512
2513        /* add this torrent's bandwidth use to allBytesUsed */
2514        allBytesUsed += used;
2515
2516        /* if piece data is disallowed, don't bother limiting bandwidth --
2517         * we won't be asking for, or sending out, any pieces */
2518        if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
2519            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2520        else
2521            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2522           
2523        /* process the torrent's peers based on its speed mode */
2524        switch( speedMode )
2525        {
2526            case TR_SPEEDLIMIT_UNLIMITED:
2527                givePeersUnlimitedBandwidth( t->peers, direction );
2528                break;
2529
2530            case TR_SPEEDLIMIT_SINGLE:
2531                setPeerBandwidth( t->peers, direction,
2532                                  t->tor->rawSpeed[direction],
2533                                  tr_torrentGetSpeedLimit( t->tor, direction ) );
2534                break;
2535
2536            case TR_SPEEDLIMIT_GLOBAL:
2537            {
2538                int       i;
2539                const int n = tr_ptrArraySize( t->peers );
2540                for( i = 0; i < n; ++i )
2541                    tr_ptrArrayAppend( globalPool,
2542                                      tr_ptrArrayNth( t->peers, i ) );
2543                poolBytesUsed += used;
2544                break;
2545            }
2546        }
2547    }
2548
2549    /* add incoming handshakes to the global pool */
2550    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2551    allBytesUsed += i;
2552    poolBytesUsed += i;
2553
2554    tr_rcTransferred( mgr->globalPoolRawSpeed[direction], poolBytesUsed );
2555
2556    /* handle the global pool's connections */
2557    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2558        givePeersUnlimitedBandwidth( globalPool, direction );
2559    else
2560        setPeerBandwidth( globalPool, direction,
2561                          mgr->globalPoolRawSpeed[direction],
2562                          tr_sessionGetSpeedLimit( session, direction ) );
2563
2564    /* now that we've allocated bandwidth, pump all the connected peers */
2565    pumpAllPeers( mgr );
2566
2567    /* cleanup */
2568    tr_ptrArrayFree( globalPool, NULL );
2569    return allBytesUsed;
2570}
2571
2572static int
2573bandwidthPulse( void * vmgr )
2574{
2575    tr_peerMgr * mgr = vmgr;
2576    int          i;
2577
2578    managerLock( mgr );
2579
2580    /* allocate the upload and download bandwidth */
2581    for( i = 0; i < 2; ++i )
2582        allocateBandwidth( mgr, i );
2583
2584    managerUnlock( mgr );
2585    return TRUE;
2586}
2587
Note: See TracBrowser for help on using the repository browser.