source: trunk/libtransmission/peer-mgr.c @ 7137

Last change on this file since 7137 was 7137, checked in by charles, 13 years ago

(libT) #1468: speed display is very jumpy

  • Property svn:keywords set to Date Rev Author Id
File size: 74.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7137 2008-11-23 16:31:28Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57   
58    /* how frequently to reallocate bandwidth */
59    BANDWIDTH_PERIOD_MSEC = 250,
60
61    /* max # of peers to ask fer per torrent per reconnect pulse */
62    MAX_RECONNECTIONS_PER_PULSE = 2,
63
64    /* max number of peers to ask for per second overall.
65    * this throttle is to avoid overloading the router */
66    MAX_CONNECTIONS_PER_SECOND = 4,
67
68    /* number of unchoked peers per torrent.
69     * FIXME: this probably ought to be configurable */
70    MAX_UNCHOKED_PEERS = 14,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91/* We keep one of these for every peer we know about, whether
92 * it's connected or not, so the struct must be small.
93 * When our current connections underperform, we dip back
94 * into this list for new ones. */
95struct peer_atom
96{
97    uint8_t           from;
98    uint8_t           flags; /* these match the added_f flags */
99    uint8_t           myflags; /* flags that aren't defined in added_f */
100    uint16_t          port;
101    uint16_t          numFails;
102    struct in_addr    addr;
103    time_t            time; /* when the peer's connection status last changed */
104    time_t            piece_data_time;
105};
106
107typedef struct
108{
109    unsigned int    isRunning : 1;
110
111    uint8_t         hash[SHA_DIGEST_LENGTH];
112    int         *   pendingRequestCount;
113    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
114    tr_ptrArray *   pool; /* struct peer_atom */
115    tr_ptrArray *   peers; /* tr_peer */
116    tr_ptrArray *   webseeds; /* tr_webseed */
117    tr_timer *      reconnectTimer;
118    tr_timer *      rechokeTimer;
119    tr_timer *      refillTimer;
120    tr_torrent *    tor;
121    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
122
123    struct tr_peerMgr * manager;
124}
125Torrent;
126
127struct tr_peerMgr
128{
129    tr_session      * session;
130    tr_ptrArray     * torrents; /* Torrent */
131    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
132    tr_timer        * bandwidthTimer;
133};
134
135#define tordbg( t, ... ) \
136    do { \
137        if( tr_deepLoggingIsActive( ) ) \
138            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
139    } while( 0 )
140
141#define dbgmsg( ... ) \
142    do { \
143        if( tr_deepLoggingIsActive( ) ) \
144            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
145    } while( 0 )
146
147/**
148***
149**/
150
151static void
152managerLock( const struct tr_peerMgr * manager )
153{
154    tr_globalLock( manager->session );
155}
156
157static void
158managerUnlock( const struct tr_peerMgr * manager )
159{
160    tr_globalUnlock( manager->session );
161}
162
163static void
164torrentLock( Torrent * torrent )
165{
166    managerLock( torrent->manager );
167}
168
169static void
170torrentUnlock( Torrent * torrent )
171{
172    managerUnlock( torrent->manager );
173}
174
175static int
176torrentIsLocked( const Torrent * t )
177{
178    return tr_globalIsLocked( t->manager->session );
179}
180
181/**
182***
183**/
184
185static int
186compareAddresses( const struct in_addr * a,
187                  const struct in_addr * b )
188{
189    if( a->s_addr != b->s_addr )
190        return a->s_addr < b->s_addr ? -1 : 1;
191
192    return 0;
193}
194
195static int
196handshakeCompareToAddr( const void * va,
197                        const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a,
206                  const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray *          handshakes,
213                      const struct in_addr * in_addr )
214{
215    return tr_ptrArrayFindSorted( handshakes,
216                                  in_addr,
217                                  handshakeCompareToAddr );
218}
219
220static int
221comparePeerAtomToAddress( const void * va,
222                          const void * vb )
223{
224    const struct peer_atom * a = va;
225
226    return compareAddresses( &a->addr, vb );
227}
228
229static int
230comparePeerAtoms( const void * va,
231                  const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va,
273             const void * vb )
274{
275    const tr_peer * a = va;
276    const tr_peer * b = vb;
277
278    return compareAddresses( &a->in_addr, &b->in_addr );
279}
280
281static int
282peerCompareToAddr( const void * va,
283                   const void * vb )
284{
285    const tr_peer * a = va;
286
287    return compareAddresses( &a->in_addr, vb );
288}
289
290static tr_peer*
291getExistingPeer( Torrent *              torrent,
292                 const struct in_addr * in_addr )
293{
294    assert( torrentIsLocked( torrent ) );
295    assert( in_addr );
296
297    return tr_ptrArrayFindSorted( torrent->peers,
298                                  in_addr,
299                                  peerCompareToAddr );
300}
301
302static struct peer_atom*
303getExistingAtom( const                  Torrent * t,
304                 const struct in_addr * addr )
305{
306    assert( torrentIsLocked( t ) );
307    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
308}
309
310static int
311peerIsInUse( const Torrent *        ct,
312             const struct in_addr * addr )
313{
314    Torrent * t = (Torrent*) ct;
315
316    assert( torrentIsLocked ( t ) );
317
318    return getExistingPeer( t, addr )
319           || getExistingHandshake( t->outgoingHandshakes, addr )
320           || getExistingHandshake( t->manager->incomingHandshakes, addr );
321}
322
323static tr_peer*
324peerConstructor( const struct in_addr * in_addr )
325{
326    tr_peer * p;
327
328    p = tr_new0( tr_peer, 1 );
329    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
330    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
331    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
332    p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
333    p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
334    return p;
335}
336
337static tr_peer*
338getPeer( Torrent *              torrent,
339         const struct in_addr * in_addr )
340{
341    tr_peer * peer;
342
343    assert( torrentIsLocked( torrent ) );
344
345    peer = getExistingPeer( torrent, in_addr );
346
347    if( peer == NULL )
348    {
349        peer = peerConstructor( in_addr );
350        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
351    }
352
353    return peer;
354}
355
356static void
357peerDestructor( tr_peer * peer )
358{
359    assert( peer );
360    assert( peer->msgs );
361
362    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
363    tr_peerMsgsFree( peer->msgs );
364
365    tr_peerIoFree( peer->io );
366
367    tr_bitfieldFree( peer->have );
368    tr_bitfieldFree( peer->blame );
369    tr_free( peer->client );
370
371    tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
372    tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
373    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
374    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
375    tr_free( peer );
376}
377
378static void
379removePeer( Torrent * t,
380            tr_peer * peer )
381{
382    tr_peer *          removed;
383    struct peer_atom * atom;
384
385    assert( torrentIsLocked( t ) );
386
387    atom = getExistingAtom( t, &peer->in_addr );
388    assert( atom );
389    atom->time = time( NULL );
390
391    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
392    assert( removed == peer );
393    peerDestructor( removed );
394}
395
396static void
397removeAllPeers( Torrent * t )
398{
399    while( !tr_ptrArrayEmpty( t->peers ) )
400        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
401}
402
403static void
404torrentDestructor( void * vt )
405{
406    Torrent * t = vt;
407    uint8_t   hash[SHA_DIGEST_LENGTH];
408
409    assert( t );
410    assert( !t->isRunning );
411    assert( t->peers );
412    assert( torrentIsLocked( t ) );
413    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
414    assert( tr_ptrArrayEmpty( t->peers ) );
415
416    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
417
418    tr_timerFree( &t->reconnectTimer );
419    tr_timerFree( &t->rechokeTimer );
420    tr_timerFree( &t->refillTimer );
421
422    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
423    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
424    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
425    tr_ptrArrayFree( t->peers, NULL );
426
427    tr_free( t->pendingRequestCount );
428    tr_free( t );
429}
430
431static void peerCallbackFunc( void * vpeer,
432                              void * vevent,
433                              void * vt );
434
435static Torrent*
436torrentConstructor( tr_peerMgr * manager,
437                    tr_torrent * tor )
438{
439    int       i;
440    Torrent * t;
441
442    t = tr_new0( Torrent, 1 );
443    t->manager = manager;
444    t->tor = tor;
445    t->pool = tr_ptrArrayNew( );
446    t->peers = tr_ptrArrayNew( );
447    t->webseeds = tr_ptrArrayNew( );
448    t->outgoingHandshakes = tr_ptrArrayNew( );
449    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
450
451    for( i = 0; i < tor->info.webseedCount; ++i )
452    {
453        tr_webseed * w =
454            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
455                           t );
456        tr_ptrArrayAppend( t->webseeds, w );
457    }
458
459    return t;
460}
461
462/**
463 * For explanation, see http://www.bittorrent.org/fast_extensions.html
464 * Also see the "test-allowed-set" unit test
465 *
466 * @param k number of pieces in set
467 * @param sz number of pieces in the torrent
468 * @param infohash torrent's SHA1 hash
469 * @param ip peer's address
470 */
471struct tr_bitfield *
472tr_peerMgrGenerateAllowedSet(
473    const uint32_t         k,
474    const uint32_t         sz,
475    const                  uint8_t        *
476                           infohash,
477    const struct in_addr * ip )
478{
479    uint8_t       w[SHA_DIGEST_LENGTH + 4];
480    uint8_t       x[SHA_DIGEST_LENGTH];
481    tr_bitfield * a;
482    uint32_t      a_size;
483
484    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
485    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
486    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
487
488    a = tr_bitfieldNew( sz );
489    a_size = 0;
490
491    while( a_size < k )
492    {
493        int i;
494        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
495        {
496            uint32_t j = i * 4;                                /* (5) */
497            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
498            uint32_t index = y % sz;                           /* (7) */
499            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
500            {
501                tr_bitfieldAdd( a, index );                    /* (9) */
502                ++a_size;
503            }
504        }
505        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
506    }
507
508    return a;
509}
510
511static int bandwidthPulse( void * vmgr );
512
513
514tr_peerMgr*
515tr_peerMgrNew( tr_session * session )
516{
517    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
518
519    m->session = session;
520    m->torrents = tr_ptrArrayNew( );
521    m->incomingHandshakes = tr_ptrArrayNew( );
522    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
523    return m;
524}
525
526void
527tr_peerMgrFree( tr_peerMgr * manager )
528{
529    managerLock( manager );
530
531    tr_timerFree( &manager->bandwidthTimer );
532
533    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
534     * the item from manager->handshakes, so this is a little roundabout... */
535    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
536        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
537
538    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
539
540    /* free the torrents. */
541    tr_ptrArrayFree( manager->torrents, torrentDestructor );
542
543    managerUnlock( manager );
544    tr_free( manager );
545}
546
547static tr_peer**
548getConnectedPeers( Torrent * t,
549                   int *     setmeCount )
550{
551    int       i, peerCount, connectionCount;
552    tr_peer **peers;
553    tr_peer **ret;
554
555    assert( torrentIsLocked( t ) );
556
557    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
558    ret = tr_new( tr_peer *, peerCount );
559
560    for( i = connectionCount = 0; i < peerCount; ++i )
561        if( peers[i]->msgs )
562            ret[connectionCount++] = peers[i];
563
564    *setmeCount = connectionCount;
565    return ret;
566}
567
568static int
569clientIsDownloadingFrom( const tr_peer * peer )
570{
571    return peer->clientIsInterested && !peer->clientIsChoked;
572}
573
574static int
575clientIsUploadingTo( const tr_peer * peer )
576{
577    return peer->peerIsInterested && !peer->peerIsChoked;
578}
579
580/***
581****
582***/
583
584int
585tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
586                      const uint8_t *        torrentHash,
587                      const struct in_addr * addr )
588{
589    int                      isSeed = FALSE;
590    const Torrent *          t = NULL;
591    const struct peer_atom * atom = NULL;
592
593    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
594    if( t )
595        atom = getExistingAtom( t, addr );
596    if( atom )
597        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
598
599    return isSeed;
600}
601
602/****
603*****
604*****  REFILL
605*****
606****/
607
608static void
609assertValidPiece( Torrent * t, tr_piece_index_t piece )
610{
611    assert( t );
612    assert( t->tor );
613    assert( piece < t->tor->info.pieceCount );
614}
615
616static int
617getPieceRequests( Torrent * t, tr_piece_index_t piece )
618{
619    assertValidPiece( t, piece );
620
621    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
622}
623
624static void
625incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
626{
627    assertValidPiece( t, piece );
628
629    if( t->pendingRequestCount == NULL )
630        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
631    t->pendingRequestCount[piece]++;
632}
633
634static void
635decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
636{
637    assertValidPiece( t, piece );
638
639    if( t->pendingRequestCount )
640        t->pendingRequestCount[piece]--;
641}
642
643struct tr_refill_piece
644{
645    tr_priority_t    priority;
646    uint32_t         piece;
647    uint32_t         peerCount;
648    int              random;
649    int              pendingRequestCount;
650    int              missingBlockCount;
651};
652
653static int
654compareRefillPiece( const void * aIn,
655                    const void * bIn )
656{
657    const struct tr_refill_piece * a = aIn;
658    const struct tr_refill_piece * b = bIn;
659
660    /* if one piece has a higher priority, it goes first */
661    if( a->priority != b->priority )
662        return a->priority > b->priority ? -1 : 1;
663
664    /* have a per-priority endgame */
665    if( a->pendingRequestCount != b->pendingRequestCount )
666        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
667
668    /* fewer missing pieces goes first */
669    if( a->missingBlockCount != b->missingBlockCount )
670        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
671
672    /* otherwise if one has fewer peers, it goes first */
673    if( a->peerCount != b->peerCount )
674        return a->peerCount < b->peerCount ? -1 : 1;
675
676    /* otherwise go with our random seed */
677    if( a->random != b->random )
678        return a->random < b->random ? -1 : 1;
679
680    return 0;
681}
682
683static tr_piece_index_t *
684getPreferredPieces( Torrent           * t,
685                    tr_piece_index_t  * pieceCount )
686{
687    const tr_torrent  * tor = t->tor;
688    const tr_info     * inf = &tor->info;
689    tr_piece_index_t    i;
690    tr_piece_index_t    poolSize = 0;
691    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
692    int                 peerCount;
693    tr_peer**           peers;
694
695    assert( torrentIsLocked( t ) );
696
697    peers = getConnectedPeers( t, &peerCount );
698
699    /* make a list of the pieces that we want but don't have */
700    for( i = 0; i < inf->pieceCount; ++i )
701        if( !tor->info.pieces[i].dnd
702                && !tr_cpPieceIsComplete( tor->completion, i ) )
703            pool[poolSize++] = i;
704
705    /* sort the pool by which to request next */
706    if( poolSize > 1 )
707    {
708        tr_piece_index_t j;
709        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
710
711        for( j = 0; j < poolSize; ++j )
712        {
713            int k;
714            const tr_piece_index_t piece = pool[j];
715            struct tr_refill_piece * setme = p + j;
716
717            setme->piece = piece;
718            setme->priority = inf->pieces[piece].priority;
719            setme->peerCount = 0;
720            setme->random = tr_cryptoWeakRandInt( INT_MAX );
721            setme->pendingRequestCount = getPieceRequests( t, piece );
722            setme->missingBlockCount
723                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
724
725            for( k = 0; k < peerCount; ++k )
726            {
727                const tr_peer * peer = peers[k];
728                if( peer->peerIsInterested
729                        && !peer->clientIsChoked
730                        && tr_bitfieldHas( peer->have, piece ) )
731                    ++setme->peerCount;
732            }
733        }
734
735        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
736               compareRefillPiece );
737
738        for( j = 0; j < poolSize; ++j )
739            pool[j] = p[j].piece;
740
741        tr_free( p );
742    }
743
744    tr_free( peers );
745
746    *pieceCount = poolSize;
747    return pool;
748}
749
750struct tr_blockIterator
751{
752    Torrent * t;
753    tr_block_index_t blockIndex, blockCount, *blocks;
754    tr_piece_index_t pieceIndex, pieceCount, *pieces;
755};
756
757static struct tr_blockIterator*
758blockIteratorNew( Torrent * t )
759{
760    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
761    i->t = t;
762    i->pieces = getPreferredPieces( t, &i->pieceCount );
763    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
764    return i;
765}
766
767static int
768blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
769{
770    int found;
771    Torrent * t = i->t;
772    tr_torrent * tor = t->tor;
773
774    while( ( i->blockIndex == i->blockCount )
775        && ( i->pieceIndex < i->pieceCount ) )
776    {
777        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
778        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
779        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
780        tr_block_index_t block;
781
782        assert( index < tor->info.pieceCount );
783
784        i->blockCount = 0;
785        i->blockIndex = 0;
786        for( block=b; block!=e; ++block )
787            if( !tr_cpBlockIsComplete( tor->completion, block ) )
788                i->blocks[i->blockCount++] = block;
789    }
790
791    if(( found = ( i->blockIndex < i->blockCount )))
792        *setme = i->blocks[i->blockIndex++];
793
794    return found;
795}
796
797static void
798blockIteratorFree( struct tr_blockIterator * i )
799{
800    tr_free( i->blocks );
801    tr_free( i->pieces );
802    tr_free( i );
803}
804
805static tr_peer**
806getPeersUploadingToClient( Torrent * t,
807                           int *     setmeCount )
808{
809    int j;
810    int peerCount = 0;
811    int retCount = 0;
812    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
813    tr_peer ** ret = tr_new( tr_peer *, peerCount );
814
815    j = 0; /* this is a temporary test to make sure we walk through all the peers */
816    if( peerCount )
817    {
818        /* Get a list of peers we're downloading from.
819           Pick a different starting point each time so all peers
820           get a chance at being the first in line */
821        const int fencepost = tr_cryptoWeakRandInt( peerCount );
822        int i = fencepost;
823        do {
824            if( clientIsDownloadingFrom( peers[i] ) )
825                ret[retCount++] = peers[i];
826            i = ( i + 1 ) % peerCount;
827            ++j;
828        } while( i != fencepost );
829    }
830    assert( j == peerCount );
831    *setmeCount = retCount;
832    return ret;
833}
834
835static uint32_t
836getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
837{
838    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
839    const uint64_t blockPos = tor->blockSize * b;
840    assert( blockPos >= piecePos );
841    return (uint32_t)( blockPos - piecePos );
842}
843
844static int
845refillPulse( void * vtorrent )
846{
847    tr_block_index_t block;
848    int peerCount;
849    int webseedCount;
850    tr_peer ** peers;
851    tr_webseed ** webseeds;
852    struct tr_blockIterator * blockIterator;
853    Torrent * t = vtorrent;
854    tr_torrent * tor = t->tor;
855
856    if( !t->isRunning )
857        return TRUE;
858    if( tr_torrentIsSeed( t->tor ) )
859        return TRUE;
860
861    torrentLock( t );
862    tordbg( t, "Refilling Request Buffers..." );
863
864    blockIterator = blockIteratorNew( t );
865    peers = getPeersUploadingToClient( t, &peerCount );
866    webseedCount = tr_ptrArraySize( t->webseeds );
867    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
868                          webseedCount * sizeof( tr_webseed* ) );
869
870    while( ( webseedCount || peerCount )
871        && blockIteratorNext( blockIterator, &block ) )
872    {
873        int j;
874        int handled = FALSE;
875
876        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
877        const uint32_t offset = getBlockOffsetInPiece( tor, block );
878        const uint32_t length = tr_torBlockCountBytes( tor, block );
879
880        /* find a peer who can ask for this block */
881        for( j=0; !handled && j<peerCount; )
882        {
883            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
884                                                   index, offset, length );
885            switch( val )
886            {
887                case TR_ADDREQ_FULL:
888                case TR_ADDREQ_CLIENT_CHOKED:
889                    peers[j] = peers[--peerCount];
890                    break;
891
892                case TR_ADDREQ_MISSING:
893                case TR_ADDREQ_DUPLICATE:
894                    ++j;
895                    break;
896
897                case TR_ADDREQ_OK:
898                    incrementPieceRequests( t, index );
899                    handled = TRUE;
900                    break;
901
902                default:
903                    assert( 0 && "unhandled value" );
904                    break;
905            }
906        }
907
908        /* maybe one of the webseeds can do it */
909        for( j=0; !handled && j<webseedCount; )
910        {
911            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
912                                                          index, offset, length );
913            switch( val )
914            {
915                case TR_ADDREQ_FULL:
916                    webseeds[j] = webseeds[--webseedCount];
917                    break;
918
919                case TR_ADDREQ_OK:
920                    incrementPieceRequests( t, index );
921                    handled = TRUE;
922                    break;
923
924                default:
925                    assert( 0 && "unhandled value" );
926                    break;
927            }
928        }
929    }
930
931    /* cleanup */
932    blockIteratorFree( blockIterator );
933    tr_free( webseeds );
934    tr_free( peers );
935
936    t->refillTimer = NULL;
937    torrentUnlock( t );
938    return FALSE;
939}
940
941static void
942broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
943{
944    int i, size;
945    tr_peer ** peers;
946
947    assert( torrentIsLocked( t ) );
948
949    peers = getConnectedPeers( t, &size );
950    for( i=0; i<size; ++i )
951        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
952    tr_free( peers );
953}
954
955static void
956addStrike( Torrent * t,
957           tr_peer * peer )
958{
959    tordbg( t, "increasing peer %s strike count to %d",
960            tr_peerIoAddrStr( &peer->in_addr,
961                              peer->port ), peer->strikes + 1 );
962
963    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
964    {
965        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
966        atom->myflags |= MYFLAG_BANNED;
967        peer->doPurge = 1;
968        tordbg( t, "banning peer %s",
969               tr_peerIoAddrStr( &atom->addr, atom->port ) );
970    }
971}
972
973static void
974gotBadPiece( Torrent *        t,
975             tr_piece_index_t pieceIndex )
976{
977    tr_torrent *   tor = t->tor;
978    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
979
980    tor->corruptCur += byteCount;
981    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
982}
983
984static void
985refillSoon( Torrent * t )
986{
987    if( t->refillTimer == NULL )
988        t->refillTimer = tr_timerNew( t->manager->session,
989                                      refillPulse, t,
990                                      REFILL_PERIOD_MSEC );
991}
992
993static void
994peerCallbackFunc( void * vpeer,
995                  void * vevent,
996                  void * vt )
997{
998    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
999    Torrent *             t = (Torrent *) vt;
1000    const tr_peer_event * e = vevent;
1001
1002    torrentLock( t );
1003
1004    switch( e->eventType )
1005    {
1006        case TR_PEER_NEED_REQ:
1007            refillSoon( t );
1008            break;
1009
1010        case TR_PEER_CANCEL:
1011            decrementPieceRequests( t, e->pieceIndex );
1012            break;
1013
1014        case TR_PEER_PEER_GOT_DATA:
1015        {
1016            const time_t now = time( NULL );
1017            tr_torrent * tor = t->tor;
1018            const tr_direction dir = TR_CLIENT_TO_PEER;
1019
1020            tor->activityDate = now;
1021
1022            if( e->wasPieceData )
1023                tor->uploadedCur += e->length;
1024
1025            /* add it to the raw upload speed */
1026            if( peer )
1027                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1028            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
1029            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
1030
1031            /* maybe add it to the piece upload speed */
1032            if( e->wasPieceData ) {
1033                if( peer )
1034                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1035                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
1036                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
1037            }
1038
1039            /* update the stats */
1040            if( e->wasPieceData )
1041                tr_statsAddUploaded( tor->session, e->length );
1042
1043            /* update our atom */
1044            if( peer ) {
1045                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1046                a->piece_data_time = now;
1047            }
1048
1049            break;
1050        }
1051
1052        case TR_PEER_CLIENT_GOT_DATA:
1053        {
1054            const time_t now = time( NULL );
1055            tr_torrent * tor = t->tor;
1056            const tr_direction dir = TR_PEER_TO_CLIENT;
1057
1058            tor->activityDate = now;
1059
1060            /* only add this to downloadedCur if we got it from a peer --
1061             * webseeds shouldn't count against our ratio.  As one tracker
1062             * admin put it, "Those pieces are downloaded directly from the
1063             * content distributor, not the peers, it is the tracker's job
1064             * to manage the swarms, not the web server and does not fit
1065             * into the jurisdiction of the tracker." */
1066            if( peer && e->wasPieceData )
1067                tor->downloadedCur += e->length;
1068
1069            /* add it to our raw download speed */
1070            if( peer )
1071                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1072            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
1073            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
1074
1075            /* maybe add it to the piece upload speed */
1076            if( e->wasPieceData ) {
1077                if( peer )
1078                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1079                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
1080                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
1081            }
1082     
1083            /* update the stats */ 
1084            if( e->wasPieceData )
1085                tr_statsAddDownloaded( tor->session, e->length );
1086
1087            /* update our atom */
1088            if( peer ) {
1089                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1090                a->piece_data_time = now;
1091            }
1092
1093            break;
1094        }
1095
1096        case TR_PEER_PEER_PROGRESS:
1097        {
1098            if( peer )
1099            {
1100                struct peer_atom * atom = getExistingAtom( t,
1101                                                           &peer->in_addr );
1102                const int          peerIsSeed = e->progress >= 1.0;
1103                if( peerIsSeed )
1104                {
1105                    tordbg( t, "marking peer %s as a seed",
1106                           tr_peerIoAddrStr( &atom->addr,
1107                                             atom->port ) );
1108                    atom->flags |= ADDED_F_SEED_FLAG;
1109                }
1110                else
1111                {
1112                    tordbg( t, "marking peer %s as a non-seed",
1113                           tr_peerIoAddrStr( &atom->addr,
1114                                             atom->port ) );
1115                    atom->flags &= ~ADDED_F_SEED_FLAG;
1116                }
1117            }
1118            break;
1119        }
1120
1121        case TR_PEER_CLIENT_GOT_BLOCK:
1122        {
1123            tr_torrent *     tor = t->tor;
1124
1125            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1126                                                e->offset );
1127
1128            tr_cpBlockAdd( tor->completion, block );
1129            decrementPieceRequests( t, e->pieceIndex );
1130
1131            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1132
1133            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1134            {
1135                const tr_piece_index_t p = e->pieceIndex;
1136                const int              ok = tr_ioTestPiece( tor, p );
1137
1138                if( !ok )
1139                {
1140                    tr_torerr( tor,
1141                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1142                              (unsigned long)p );
1143                }
1144
1145                tr_torrentSetHasPiece( tor, p, ok );
1146                tr_torrentSetPieceChecked( tor, p, TRUE );
1147                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1148
1149                if( !ok )
1150                    gotBadPiece( t, p );
1151                else
1152                {
1153                    int        i, peerCount;
1154                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1155                    for( i = 0; i < peerCount; ++i )
1156                        tr_peerMsgsHave( peers[i]->msgs, p );
1157                    tr_free( peers );
1158                }
1159
1160                tr_torrentRecheckCompleteness( tor );
1161            }
1162            break;
1163        }
1164
1165        case TR_PEER_ERROR:
1166            if( e->err == EINVAL )
1167            {
1168                addStrike( t, peer );
1169                peer->doPurge = 1;
1170            }
1171            else if( ( e->err == ERANGE )
1172                  || ( e->err == EMSGSIZE )
1173                  || ( e->err == ENOTCONN ) )
1174            {
1175                /* some protocol error from the peer */
1176                peer->doPurge = 1;
1177            }
1178            else /* a local error, such as an IO error */
1179            {
1180                t->tor->error = e->err;
1181                tr_strlcpy( t->tor->errorString,
1182                            tr_strerror( t->tor->error ),
1183                            sizeof( t->tor->errorString ) );
1184                tr_torrentStop( t->tor );
1185            }
1186            break;
1187
1188        default:
1189            assert( 0 );
1190    }
1191
1192    torrentUnlock( t );
1193}
1194
1195static void
1196ensureAtomExists( Torrent *              t,
1197                  const struct in_addr * addr,
1198                  uint16_t               port,
1199                  uint8_t                flags,
1200                  uint8_t                from )
1201{
1202    if( getExistingAtom( t, addr ) == NULL )
1203    {
1204        struct peer_atom * a;
1205        a = tr_new0( struct peer_atom, 1 );
1206        a->addr = *addr;
1207        a->port = port;
1208        a->flags = flags;
1209        a->from = from;
1210        tordbg( t, "got a new atom: %s",
1211               tr_peerIoAddrStr( &a->addr, a->port ) );
1212        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1213    }
1214}
1215
1216static int
1217getMaxPeerCount( const tr_torrent * tor )
1218{
1219    return tor->maxConnectedPeers;
1220}
1221
1222static int
1223getPeerCount( const Torrent * t )
1224{
1225    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1226               t->outgoingHandshakes );
1227}
1228
1229/* FIXME: this is kind of a mess. */
1230static int
1231myHandshakeDoneCB( tr_handshake *  handshake,
1232                   tr_peerIo *     io,
1233                   int             isConnected,
1234                   const uint8_t * peer_id,
1235                   void *          vmanager )
1236{
1237    int                    ok = isConnected;
1238    int                    success = FALSE;
1239    uint16_t               port;
1240    const struct in_addr * addr;
1241    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1242    Torrent *              t;
1243    tr_handshake *         ours;
1244
1245    assert( io );
1246    assert( isConnected == 0 || isConnected == 1 );
1247
1248    t = tr_peerIoHasTorrentHash( io )
1249        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1250        : NULL;
1251
1252    if( tr_peerIoIsIncoming ( io ) )
1253        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1254                                        handshake, handshakeCompare );
1255    else if( t )
1256        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1257                                        handshake, handshakeCompare );
1258    else
1259        ours = handshake;
1260
1261    assert( ours );
1262    assert( ours == handshake );
1263
1264    if( t )
1265        torrentLock( t );
1266
1267    addr = tr_peerIoGetAddress( io, &port );
1268
1269    if( !ok || !t || !t->isRunning )
1270    {
1271        if( t )
1272        {
1273            struct peer_atom * atom = getExistingAtom( t, addr );
1274            if( atom )
1275                ++atom->numFails;
1276        }
1277
1278        tr_peerIoFree( io );
1279    }
1280    else /* looking good */
1281    {
1282        struct peer_atom * atom;
1283        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1284        atom = getExistingAtom( t, addr );
1285        atom->time = time( NULL );
1286
1287        if( atom->myflags & MYFLAG_BANNED )
1288        {
1289            tordbg( t, "banned peer %s tried to reconnect",
1290                   tr_peerIoAddrStr( &atom->addr,
1291                                     atom->port ) );
1292            tr_peerIoFree( io );
1293        }
1294        else if( tr_peerIoIsIncoming( io )
1295               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1296
1297        {
1298            tr_peerIoFree( io );
1299        }
1300        else
1301        {
1302            tr_peer * peer = getExistingPeer( t, addr );
1303
1304            if( peer ) /* we already have this peer */
1305            {
1306                tr_peerIoFree( io );
1307            }
1308            else
1309            {
1310                peer = getPeer( t, addr );
1311                tr_free( peer->client );
1312
1313                if( !peer_id )
1314                    peer->client = NULL;
1315                else
1316                {
1317                    char client[128];
1318                    tr_clientForId( client, sizeof( client ), peer_id );
1319                    peer->client = tr_strdup( client );
1320                }
1321                peer->port = port;
1322                peer->io = io;
1323                peer->msgs =
1324                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1325                                    &peer->msgsTag );
1326
1327                success = TRUE;
1328            }
1329        }
1330    }
1331
1332    if( t )
1333        torrentUnlock( t );
1334
1335    return success;
1336}
1337
1338void
1339tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1340                       struct in_addr * addr,
1341                       uint16_t         port,
1342                       int              socket )
1343{
1344    managerLock( manager );
1345
1346    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1347    {
1348        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1349               inet_ntoa( *addr ) );
1350        tr_netClose( socket );
1351    }
1352    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1353    {
1354        tr_netClose( socket );
1355    }
1356    else /* we don't have a connetion to them yet... */
1357    {
1358        tr_peerIo *    io;
1359        tr_handshake * handshake;
1360
1361        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1362
1363        handshake = tr_handshakeNew( io,
1364                                     manager->session->encryptionMode,
1365                                     myHandshakeDoneCB,
1366                                     manager );
1367
1368        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1369                                 handshakeCompare );
1370    }
1371
1372    managerUnlock( manager );
1373}
1374
1375void
1376tr_peerMgrAddPex( tr_peerMgr *    manager,
1377                  const uint8_t * torrentHash,
1378                  uint8_t         from,
1379                  const tr_pex *  pex )
1380{
1381    Torrent * t;
1382
1383    managerLock( manager );
1384
1385    t = getExistingTorrent( manager, torrentHash );
1386    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1387        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1388
1389    managerUnlock( manager );
1390}
1391
1392tr_pex *
1393tr_peerMgrCompactToPex( const void *    compact,
1394                        size_t          compactLen,
1395                        const uint8_t * added_f,
1396                        size_t          added_f_len,
1397                        size_t *        pexCount )
1398{
1399    size_t          i;
1400    size_t          n = compactLen / 6;
1401    const uint8_t * walk = compact;
1402    tr_pex *        pex = tr_new0( tr_pex, n );
1403
1404    for( i = 0; i < n; ++i )
1405    {
1406        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1407        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1408        if( added_f && ( n == added_f_len ) )
1409            pex[i].flags = added_f[i];
1410    }
1411
1412    *pexCount = n;
1413    return pex;
1414}
1415
1416/**
1417***
1418**/
1419
1420void
1421tr_peerMgrSetBlame( tr_peerMgr *     manager,
1422                    const uint8_t *  torrentHash,
1423                    tr_piece_index_t pieceIndex,
1424                    int              success )
1425{
1426    if( !success )
1427    {
1428        int        peerCount, i;
1429        Torrent *  t = getExistingTorrent( manager, torrentHash );
1430        tr_peer ** peers;
1431
1432        assert( torrentIsLocked( t ) );
1433
1434        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1435        for( i = 0; i < peerCount; ++i )
1436        {
1437            tr_peer * peer = peers[i];
1438            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1439            {
1440                tordbg(
1441                    t,
1442                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1443                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1444                    pieceIndex, (int)peer->strikes + 1 );
1445                addStrike( t, peer );
1446            }
1447        }
1448    }
1449}
1450
1451int
1452tr_pexCompare( const void * va,
1453               const void * vb )
1454{
1455    const tr_pex * a = va;
1456    const tr_pex * b = vb;
1457    int            i =
1458        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1459
1460    if( i ) return i;
1461    if( a->port < b->port ) return -1;
1462    if( a->port > b->port ) return 1;
1463    return 0;
1464}
1465
1466int tr_pexCompare( const void * a,
1467                   const void * b );
1468
1469static int
1470peerPrefersCrypto( const tr_peer * peer )
1471{
1472    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1473        return TRUE;
1474
1475    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1476        return FALSE;
1477
1478    return tr_peerIoIsEncrypted( peer->io );
1479}
1480
1481int
1482tr_peerMgrGetPeers( tr_peerMgr *    manager,
1483                    const uint8_t * torrentHash,
1484                    tr_pex **       setme_pex )
1485{
1486    int peerCount = 0;
1487    const Torrent *  t;
1488
1489    managerLock( manager );
1490
1491    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1492    if( !t )
1493    {
1494        *setme_pex = NULL;
1495    }
1496    else
1497    {
1498        int i;
1499        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1500        tr_pex * pex = tr_new( tr_pex, peerCount );
1501        tr_pex * walk = pex;
1502
1503        for( i = 0; i < peerCount; ++i, ++walk )
1504        {
1505            const tr_peer * peer = peers[i];
1506            walk->in_addr = peer->in_addr;
1507            walk->port = peer->port;
1508            walk->flags = 0;
1509            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1510            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1511        }
1512
1513        assert( ( walk - pex ) == peerCount );
1514        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1515        *setme_pex = pex;
1516    }
1517
1518    managerUnlock( manager );
1519    return peerCount;
1520}
1521
1522static int reconnectPulse( void * vtorrent );
1523
1524static int rechokePulse( void * vtorrent );
1525
1526void
1527tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1528                        const uint8_t * torrentHash )
1529{
1530    Torrent * t;
1531
1532    managerLock( manager );
1533
1534    t = getExistingTorrent( manager, torrentHash );
1535
1536    assert( t );
1537    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1538    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1539
1540    if( !t->isRunning )
1541    {
1542        t->isRunning = 1;
1543
1544        t->reconnectTimer = tr_timerNew( t->manager->session,
1545                                         reconnectPulse, t,
1546                                         RECONNECT_PERIOD_MSEC );
1547
1548        t->rechokeTimer = tr_timerNew( t->manager->session,
1549                                       rechokePulse, t,
1550                                       RECHOKE_PERIOD_MSEC );
1551
1552        reconnectPulse( t );
1553
1554        rechokePulse( t );
1555
1556        if( !tr_ptrArrayEmpty( t->webseeds ) )
1557            refillSoon( t );
1558    }
1559
1560    managerUnlock( manager );
1561}
1562
1563static void
1564stopTorrent( Torrent * t )
1565{
1566    assert( torrentIsLocked( t ) );
1567
1568    t->isRunning = 0;
1569    tr_timerFree( &t->rechokeTimer );
1570    tr_timerFree( &t->reconnectTimer );
1571
1572    /* disconnect the peers. */
1573    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1574    tr_ptrArrayClear( t->peers );
1575
1576    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1577     * which removes the handshake from t->outgoingHandshakes... */
1578    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1579        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1580}
1581
1582void
1583tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1584                       const uint8_t * torrentHash )
1585{
1586    managerLock( manager );
1587
1588    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1589
1590    managerUnlock( manager );
1591}
1592
1593void
1594tr_peerMgrAddTorrent( tr_peerMgr * manager,
1595                      tr_torrent * tor )
1596{
1597    Torrent * t;
1598
1599    managerLock( manager );
1600
1601    assert( tor );
1602    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1603
1604    t = torrentConstructor( manager, tor );
1605    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1606
1607    managerUnlock( manager );
1608}
1609
1610void
1611tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1612                         const uint8_t * torrentHash )
1613{
1614    Torrent * t;
1615
1616    managerLock( manager );
1617
1618    t = getExistingTorrent( manager, torrentHash );
1619    assert( t );
1620    stopTorrent( t );
1621    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1622    torrentDestructor( t );
1623
1624    managerUnlock( manager );
1625}
1626
1627void
1628tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1629                               const uint8_t *    torrentHash,
1630                               int8_t *           tab,
1631                               unsigned int       tabCount )
1632{
1633    tr_piece_index_t   i;
1634    const Torrent *    t;
1635    const tr_torrent * tor;
1636    float              interval;
1637    int                isComplete;
1638    int                peerCount;
1639    const tr_peer **   peers;
1640
1641    managerLock( manager );
1642
1643    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1644    tor = t->tor;
1645    interval = tor->info.pieceCount / (float)tabCount;
1646    isComplete = tor
1647                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1648    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1649
1650    memset( tab, 0, tabCount );
1651
1652    for( i = 0; tor && i < tabCount; ++i )
1653    {
1654        const int piece = i * interval;
1655
1656        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1657            tab[i] = -1;
1658        else if( peerCount )
1659        {
1660            int j;
1661            for( j = 0; j < peerCount; ++j )
1662                if( tr_bitfieldHas( peers[j]->have, i ) )
1663                    ++tab[i];
1664        }
1665    }
1666
1667    managerUnlock( manager );
1668}
1669
1670/* Returns the pieces that are available from peers */
1671tr_bitfield*
1672tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1673                        const uint8_t *    torrentHash )
1674{
1675    int           i, size;
1676    Torrent *     t;
1677    tr_peer **    peers;
1678    tr_bitfield * pieces;
1679
1680    managerLock( manager );
1681
1682    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1683    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1684    peers = getConnectedPeers( t, &size );
1685    for( i = 0; i < size; ++i )
1686        tr_bitfieldOr( pieces, peers[i]->have );
1687
1688    managerUnlock( manager );
1689    tr_free( peers );
1690    return pieces;
1691}
1692
1693int
1694tr_peerMgrHasConnections( const tr_peerMgr * manager,
1695                          const uint8_t *    torrentHash )
1696{
1697    int             ret;
1698    const Torrent * t;
1699
1700    managerLock( manager );
1701
1702    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1703    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1704               || !tr_ptrArrayEmpty( t->webseeds ) );
1705
1706    managerUnlock( manager );
1707    return ret;
1708}
1709
1710void
1711tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1712                        const uint8_t *    torrentHash,
1713                        int *              setmePeersKnown,
1714                        int *              setmePeersConnected,
1715                        int *              setmeSeedsConnected,
1716                        int *              setmeWebseedsSendingToUs,
1717                        int *              setmePeersSendingToUs,
1718                        int *              setmePeersGettingFromUs,
1719                        int *              setmePeersFrom )
1720{
1721    int                 i, size;
1722    const Torrent *     t;
1723    const tr_peer **    peers;
1724    const tr_webseed ** webseeds;
1725
1726    managerLock( manager );
1727
1728    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1729    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1730
1731    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1732    *setmePeersConnected       = 0;
1733    *setmeSeedsConnected       = 0;
1734    *setmePeersGettingFromUs   = 0;
1735    *setmePeersSendingToUs     = 0;
1736    *setmeWebseedsSendingToUs  = 0;
1737
1738    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1739        setmePeersFrom[i] = 0;
1740
1741    for( i = 0; i < size; ++i )
1742    {
1743        const tr_peer *          peer = peers[i];
1744        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1745
1746        if( peer->io == NULL ) /* not connected */
1747            continue;
1748
1749        ++ * setmePeersConnected;
1750
1751        ++setmePeersFrom[atom->from];
1752
1753        if( clientIsDownloadingFrom( peer ) )
1754            ++ * setmePeersSendingToUs;
1755
1756        if( clientIsUploadingTo( peer ) )
1757            ++ * setmePeersGettingFromUs;
1758
1759        if( atom->flags & ADDED_F_SEED_FLAG )
1760            ++ * setmeSeedsConnected;
1761    }
1762
1763    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1764    for( i = 0; i < size; ++i )
1765    {
1766        if( tr_webseedIsActive( webseeds[i] ) )
1767            ++ * setmeWebseedsSendingToUs;
1768    }
1769
1770    managerUnlock( manager );
1771}
1772
1773float*
1774tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1775                     const uint8_t *    torrentHash )
1776{
1777    const Torrent *     t;
1778    const tr_webseed ** webseeds;
1779    int                 i;
1780    int                 webseedCount;
1781    float *             ret;
1782
1783    assert( manager );
1784    managerLock( manager );
1785
1786    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1787    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1788                                                     &webseedCount );
1789    assert( webseedCount == t->tor->info.webseedCount );
1790    ret = tr_new0( float, webseedCount );
1791
1792    for( i = 0; i < webseedCount; ++i )
1793        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1794            ret[i] = -1.0;
1795
1796    managerUnlock( manager );
1797    return ret;
1798}
1799
1800double
1801tr_peerGetPieceSpeed( const tr_peer    * peer,
1802                      tr_direction       direction )
1803{
1804    assert( peer );
1805    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1806
1807    return tr_rcRate( peer->pieceSpeed[direction] );
1808}
1809
1810
1811struct tr_peer_stat *
1812tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1813                     const   uint8_t     * torrentHash,
1814                     int                 * setmeCount UNUSED )
1815{
1816    int             i, size;
1817    const Torrent * t;
1818    tr_peer **      peers;
1819    tr_peer_stat *  ret;
1820
1821    assert( manager );
1822    managerLock( manager );
1823
1824    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1825    peers = getConnectedPeers( (Torrent*)t, &size );
1826    ret = tr_new0( tr_peer_stat, size );
1827
1828    for( i = 0; i < size; ++i )
1829    {
1830        char *                   pch;
1831        const tr_peer *          peer = peers[i];
1832        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1833        tr_peer_stat *           stat = ret + i;
1834
1835        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1836        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1837                   sizeof( stat->client ) );
1838        stat->port               = ntohs( peer->port );
1839        stat->from               = atom->from;
1840        stat->progress           = peer->progress;
1841        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1842        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1843        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1844        stat->peerIsChoked       = peer->peerIsChoked;
1845        stat->peerIsInterested   = peer->peerIsInterested;
1846        stat->clientIsChoked     = peer->clientIsChoked;
1847        stat->clientIsInterested = peer->clientIsInterested;
1848        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1849        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1850        stat->isUploadingTo      = clientIsUploadingTo( peer );
1851
1852        pch = stat->flagStr;
1853        if( t->optimistic == peer ) *pch++ = 'O';
1854        if( stat->isDownloadingFrom ) *pch++ = 'D';
1855        else if( stat->clientIsInterested ) *pch++ = 'd';
1856        if( stat->isUploadingTo ) *pch++ = 'U';
1857        else if( stat->peerIsInterested ) *pch++ = 'u';
1858        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1859                'K';
1860        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1861        if( stat->isEncrypted ) *pch++ = 'E';
1862        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1863        if( stat->isIncoming ) *pch++ = 'I';
1864        *pch = '\0';
1865    }
1866
1867    *setmeCount = size;
1868    tr_free( peers );
1869
1870    managerUnlock( manager );
1871    return ret;
1872}
1873
1874/**
1875***
1876**/
1877
1878struct ChokeData
1879{
1880    unsigned int    doUnchoke    : 1;
1881    unsigned int    isInterested : 1;
1882    unsigned int    isChoked     : 1;
1883    int             rate;
1884    tr_peer *       peer;
1885};
1886
1887static int
1888compareChoke( const void * va,
1889              const void * vb )
1890{
1891    const struct ChokeData * a = va;
1892    const struct ChokeData * b = vb;
1893
1894    if( a->rate != b->rate ) /* prefer higher overall speeds */
1895        return a->rate > b->rate ? -1 : 1;
1896
1897    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1898        return a->isChoked ? 1 : -1;
1899
1900    return 0;
1901}
1902
1903static int
1904isNew( const tr_peer * peer )
1905{
1906    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1907}
1908
1909static int
1910isSame( const tr_peer * peer )
1911{
1912    return peer && peer->client && strstr( peer->client, "Transmission" );
1913}
1914
1915/**
1916***
1917**/
1918
1919static void
1920rechoke( Torrent * t )
1921{
1922    int                i, peerCount, size, unchokedInterested;
1923    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1924    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1925    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1926
1927    assert( torrentIsLocked( t ) );
1928
1929    /* sort the peers by preference and rate */
1930    for( i = 0, size = 0; i < peerCount; ++i )
1931    {
1932        tr_peer * peer = peers[i];
1933        if( peer->progress >= 1.0 ) /* choke all seeds */
1934            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1935        else if( chokeAll )
1936            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1937        else {
1938            struct ChokeData * n = &choke[size++];
1939            n->peer         = peer;
1940            n->isInterested = peer->peerIsInterested;
1941            n->isChoked     = peer->peerIsChoked;
1942            n->rate = (int)(tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER )
1943                            + tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT ) );
1944        }
1945    }
1946
1947    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1948
1949    /**
1950     * Reciprocation and number of uploads capping is managed by unchoking
1951     * the N peers which have the best upload rate and are interested.
1952     * This maximizes the client's download rate. These N peers are
1953     * referred to as downloaders, because they are interested in downloading
1954     * from the client.
1955     *
1956     * Peers which have a better upload rate (as compared to the downloaders)
1957     * but aren't interested get unchoked. If they become interested, the
1958     * downloader with the worst upload rate gets choked. If a client has
1959     * a complete file, it uses its upload rate rather than its download
1960     * rate to decide which peers to unchoke.
1961     */
1962    unchokedInterested = 0;
1963    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1964    {
1965        choke[i].doUnchoke = 1;
1966        if( choke[i].isInterested )
1967            ++unchokedInterested;
1968    }
1969
1970    /* optimistic unchoke */
1971    if( i < size )
1972    {
1973        int                n;
1974        struct ChokeData * c;
1975        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1976
1977        for( ; i < size; ++i )
1978        {
1979            if( choke[i].isInterested )
1980            {
1981                const tr_peer * peer = choke[i].peer;
1982                int             x = 1, y;
1983                if( isNew( peer ) ) x *= 3;
1984                if( isSame( peer ) ) x *= 3;
1985                for( y = 0; y < x; ++y )
1986                    tr_ptrArrayAppend( randPool, &choke[i] );
1987            }
1988        }
1989
1990        if( ( n = tr_ptrArraySize( randPool ) ) )
1991        {
1992            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1993            c->doUnchoke = 1;
1994            t->optimistic = c->peer;
1995        }
1996
1997        tr_ptrArrayFree( randPool, NULL );
1998    }
1999
2000    for( i = 0; i < size; ++i )
2001        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2002
2003    /* cleanup */
2004    tr_free( choke );
2005    tr_free( peers );
2006}
2007
2008static int
2009rechokePulse( void * vtorrent )
2010{
2011    Torrent * t = vtorrent;
2012
2013    torrentLock( t );
2014    rechoke( t );
2015    torrentUnlock( t );
2016    return TRUE;
2017}
2018
2019/***
2020****
2021****  Life and Death
2022****
2023***/
2024
2025static int
2026shouldPeerBeClosed( const Torrent * t,
2027                    const tr_peer * peer,
2028                    int             peerCount )
2029{
2030    const tr_torrent *       tor = t->tor;
2031    const time_t             now = time( NULL );
2032    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2033
2034    /* if it's marked for purging, close it */
2035    if( peer->doPurge )
2036    {
2037        tordbg( t, "purging peer %s because its doPurge flag is set",
2038               tr_peerIoAddrStr( &atom->addr,
2039                                 atom->port ) );
2040        return TRUE;
2041    }
2042
2043    /* if we're seeding and the peer has everything we have,
2044     * and enough time has passed for a pex exchange, then disconnect */
2045    if( tr_torrentIsSeed( tor ) )
2046    {
2047        int peerHasEverything;
2048        if( atom->flags & ADDED_F_SEED_FLAG )
2049            peerHasEverything = TRUE;
2050        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2051            peerHasEverything = FALSE;
2052        else
2053        {
2054            tr_bitfield * tmp =
2055                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2056            tr_bitfieldDifference( tmp, peer->have );
2057            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2058            tr_bitfieldFree( tmp );
2059        }
2060        if( peerHasEverything
2061          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2062        {
2063            tordbg( t, "purging peer %s because we're both seeds",
2064                   tr_peerIoAddrStr( &atom->addr,
2065                                     atom->port ) );
2066            return TRUE;
2067        }
2068    }
2069
2070    /* disconnect if it's been too long since piece data has been transferred.
2071     * this is on a sliding scale based on number of available peers... */
2072    {
2073        const int    relaxStrictnessIfFewerThanN =
2074            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2075        /* if we have >= relaxIfFewerThan, strictness is 100%.
2076         * if we have zero connections, strictness is 0% */
2077        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2078                                  ? 1.0
2079                                  : peerCount /
2080                                  (float)relaxStrictnessIfFewerThanN;
2081        const int    lo = MIN_UPLOAD_IDLE_SECS;
2082        const int    hi = MAX_UPLOAD_IDLE_SECS;
2083        const int    limit = lo + ( ( hi - lo ) * strictness );
2084        const time_t then = peer->pieceDataActivityDate;
2085        const int    idleTime = then ? ( now - then ) : 0;
2086        if( idleTime > limit )
2087        {
2088            tordbg(
2089                t,
2090                "purging peer %s because it's been %d secs since we shared anything",
2091                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2092            return TRUE;
2093        }
2094    }
2095
2096    return FALSE;
2097}
2098
2099static tr_peer **
2100getPeersToClose( Torrent * t,
2101                 int *     setmeSize )
2102{
2103    int               i, peerCount, outsize;
2104    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2105                                                           &peerCount );
2106    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2107
2108    assert( torrentIsLocked( t ) );
2109
2110    for( i = outsize = 0; i < peerCount; ++i )
2111        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2112            ret[outsize++] = peers[i];
2113
2114    *setmeSize = outsize;
2115    return ret;
2116}
2117
2118static int
2119compareCandidates( const void * va,
2120                   const void * vb )
2121{
2122    const struct peer_atom * a = *(const struct peer_atom**) va;
2123    const struct peer_atom * b = *(const struct peer_atom**) vb;
2124
2125    /* <Charles> Here we would probably want to try reconnecting to
2126     * peers that had most recently given us data. Lots of users have
2127     * trouble with resets due to their routers and/or ISPs. This way we
2128     * can quickly recover from an unwanted reset. So we sort
2129     * piece_data_time in descending order.
2130     */
2131
2132    if( a->piece_data_time != b->piece_data_time )
2133        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2134
2135    if( a->numFails != b->numFails )
2136        return a->numFails < b->numFails ? -1 : 1;
2137
2138    if( a->time != b->time )
2139        return a->time < b->time ? -1 : 1;
2140
2141    return 0;
2142}
2143
2144static int
2145getReconnectIntervalSecs( const struct peer_atom * atom )
2146{
2147    int          sec;
2148    const time_t now = time( NULL );
2149
2150    /* if we were recently connected to this peer and transferring piece
2151     * data, try to reconnect to them sooner rather that later -- we don't
2152     * want network troubles to get in the way of a good peer. */
2153    if( ( now - atom->piece_data_time ) <=
2154       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2155        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2156
2157    /* don't allow reconnects more often than our minimum */
2158    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2159        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2160
2161    /* otherwise, the interval depends on how many times we've tried
2162     * and failed to connect to the peer */
2163    else switch( atom->numFails )
2164        {
2165            case 0:
2166                sec = 0; break;
2167
2168            case 1:
2169                sec = 5; break;
2170
2171            case 2:
2172                sec = 2 * 60; break;
2173
2174            case 3:
2175                sec = 15 * 60; break;
2176
2177            case 4:
2178                sec = 30 * 60; break;
2179
2180            case 5:
2181                sec = 60 * 60; break;
2182
2183            default:
2184                sec = 120 * 60; break;
2185        }
2186
2187    return sec;
2188}
2189
2190static struct peer_atom **
2191getPeerCandidates(                               Torrent * t,
2192                                           int * setmeSize )
2193{
2194    int                 i, atomCount, retCount;
2195    struct peer_atom ** atoms;
2196    struct peer_atom ** ret;
2197    const time_t        now = time( NULL );
2198    const int           seed = tr_torrentIsSeed( t->tor );
2199
2200    assert( torrentIsLocked( t ) );
2201
2202    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2203    ret = tr_new( struct peer_atom*, atomCount );
2204    for( i = retCount = 0; i < atomCount; ++i )
2205    {
2206        int                interval;
2207        struct peer_atom * atom = atoms[i];
2208
2209        /* peer fed us too much bad data ... we only keep it around
2210         * now to weed it out in case someone sends it to us via pex */
2211        if( atom->myflags & MYFLAG_BANNED )
2212            continue;
2213
2214        /* peer was unconnectable before, so we're not going to keep trying.
2215         * this is needs a separate flag from `banned', since if they try
2216         * to connect to us later, we'll let them in */
2217        if( atom->myflags & MYFLAG_UNREACHABLE )
2218            continue;
2219
2220        /* we don't need two connections to the same peer... */
2221        if( peerIsInUse( t, &atom->addr ) )
2222            continue;
2223
2224        /* no need to connect if we're both seeds... */
2225        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2226            continue;
2227
2228        /* don't reconnect too often */
2229        interval = getReconnectIntervalSecs( atom );
2230        if( ( now - atom->time ) < interval )
2231        {
2232            tordbg(
2233                t,
2234                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2235                i, tr_peerIoAddrStr( &atom->addr,
2236                                     atom->port ), interval );
2237            continue;
2238        }
2239
2240        /* Don't connect to peers in our blocklist */
2241        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2242            continue;
2243
2244        ret[retCount++] = atom;
2245    }
2246
2247    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2248    *setmeSize = retCount;
2249    return ret;
2250}
2251
2252static int
2253reconnectPulse( void * vtorrent )
2254{
2255    Torrent *     t = vtorrent;
2256    static time_t prevTime = 0;
2257    static int    newConnectionsThisSecond = 0;
2258    time_t        now;
2259
2260    torrentLock( t );
2261
2262    now = time( NULL );
2263    if( prevTime != now )
2264    {
2265        prevTime = now;
2266        newConnectionsThisSecond = 0;
2267    }
2268
2269    if( !t->isRunning )
2270    {
2271        removeAllPeers( t );
2272    }
2273    else
2274    {
2275        int                 i, nCandidates, nBad;
2276        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2277        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2278
2279        if( nBad || nCandidates )
2280            tordbg(
2281                t, "reconnect pulse for [%s]: %d bad connections, "
2282                   "%d connection candidates, %d atoms, max per pulse is %d",
2283                t->tor->info.name, nBad, nCandidates,
2284                tr_ptrArraySize( t->pool ),
2285                (int)MAX_RECONNECTIONS_PER_PULSE );
2286
2287        /* disconnect some peers.
2288           if we transferred piece data, then they might be good peers,
2289           so reset their `numFails' weight to zero.  otherwise we connected
2290           to them fruitlessly, so mark it as another fail */
2291        for( i = 0; i < nBad; ++i )
2292        {
2293            tr_peer *          peer = connections[i];
2294            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2295            if( peer->pieceDataActivityDate )
2296                atom->numFails = 0;
2297            else
2298                ++atom->numFails;
2299            tordbg( t, "removing bad peer %s",
2300                   tr_peerIoGetAddrStr( peer->io ) );
2301            removePeer( t, peer );
2302        }
2303
2304        /* add some new ones */
2305        for( i = 0;    ( i < nCandidates )
2306           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2307           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2308           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2309             ++i )
2310        {
2311            tr_peerMgr *       mgr = t->manager;
2312            struct peer_atom * atom = candidates[i];
2313            tr_peerIo *        io;
2314
2315            tordbg( t, "Starting an OUTGOING connection with %s",
2316                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2317
2318            io =
2319                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2320                                      t->hash );
2321            if( io == NULL )
2322            {
2323                atom->myflags |= MYFLAG_UNREACHABLE;
2324            }
2325            else
2326            {
2327                tr_handshake * handshake = tr_handshakeNew(
2328                    io,
2329                    mgr->session->
2330                    encryptionMode,
2331                    myHandshakeDoneCB,
2332                    mgr );
2333
2334                assert( tr_peerIoGetTorrentHash( io ) );
2335
2336                ++newConnectionsThisSecond;
2337
2338                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2339                                         handshakeCompare );
2340            }
2341
2342            atom->time = time( NULL );
2343        }
2344
2345        /* cleanup */
2346        tr_free( connections );
2347        tr_free( candidates );
2348    }
2349
2350    torrentUnlock( t );
2351    return TRUE;
2352}
2353
2354/****
2355*****
2356*****  BANDWIDTH ALLOCATION
2357*****
2358****/
2359
2360#if 0
2361#define DEBUG_DIRECTION TR_UP
2362#endif
2363
2364static double
2365bytesPerPulse( double KiB_per_second )
2366{
2367    return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 );
2368}
2369
2370/*
2371 * @param currentSpeed current speed in KiB/s
2372 * @param desiredSpeed desired speed in KiB/s
2373 */
2374static double
2375allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed )
2376{
2377    const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
2378    const double current_bytes_per_pulse = bytesPerPulse( currentSpeed );
2379    const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed );
2380    const double min = desired_bytes_per_pulse * 0.90;
2381    const double max = desired_bytes_per_pulse * 1.25;
2382    const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
2383                                  - ( current_bytes_per_pulse * pulses_per_history );
2384    double clamped;
2385
2386    /* clamp the return value to lessen oscillation */
2387    clamped = next_pulse_bytes;
2388    clamped = MAX( clamped, min );
2389    clamped = MIN( clamped, max );
2390
2391#ifdef DEBUG_DIRECTION
2392if( dir == DEBUG_DIRECTION )
2393fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (%5.2f)\n",
2394         currentSpeed,
2395         desiredSpeed,
2396         clamped/1024.0,
2397         next_pulse_bytes/1024.0 );
2398#endif
2399
2400    return clamped;
2401}
2402/**
2403 * Distributes a fixed amount of bandwidth among a set of peers.
2404 *
2405 * @param peerArray peers whose client-to-peer bandwidth will be set
2406 * @param direction whether to allocate upload or download bandwidth
2407 * @param currentSpeed current speed in KiB/s for this set of peers
2408 * @param desiredSpeed desired speed in KiB/s for this set of peers
2409 */
2410static void
2411setPeerBandwidth( tr_ptrArray          * peerArray,
2412                  const tr_direction     direction,
2413                  double                 currentSpeed,
2414                  double                 desiredSpeed )
2415{
2416    const int    MINIMUM_WELFARE_BYTES = bytesPerPulse( 5 );
2417    int          i;
2418    double       welfare;
2419    double       meritBytes;
2420    double       meritMultiplier;
2421    double       welfareBytes;
2422    const int    peerCount      = tr_ptrArraySize( peerArray );
2423    const double bytes          = allocateHowMuch( direction, currentSpeed, desiredSpeed );
2424    tr_peer **   peers          = (tr_peer**) tr_ptrArrayBase( peerArray );
2425
2426    /* how many bytes we'll allocate based on merit.
2427     *
2428     * 1. When just getting started we want to give all the peers a lot
2429     *    of `welfare' allocation because we don't know which ones will
2430     *    turn out to be productive for us.
2431     *
2432     * 2. When we've reached steady state and are near our bandwidth limit,
2433     *    the bandwidth spent on `welfare' is going to come from peers that
2434     *    we already know are productive... which is probably a waste.
2435     *
2436     * 3. So we tie the merit/welfare allocations to the current speed.
2437     *    the closer the current speed gets to the maximum speed, the less
2438     *    welfare we allocate.
2439     *
2440     * 4. We always need to allocate /some/ welfare bytes, otherwise
2441     *    the other peers will starve.
2442     */
2443    meritBytes = bytesPerPulse( MIN( currentSpeed * 1.2, desiredSpeed ) );
2444    welfareBytes = bytes > meritBytes ?  bytes - meritBytes : 0;
2445    if( welfareBytes < MINIMUM_WELFARE_BYTES )
2446        welfareBytes = MINIMUM_WELFARE_BYTES;
2447    meritBytes = bytes - welfareBytes;
2448    meritMultiplier = currentSpeed > 0.01 ?  meritBytes / currentSpeed : 0.0;
2449
2450#ifdef DEBUG_DIRECTION
2451if( direction == DEBUG_DIRECTION )
2452fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f) - k[%.1f] merit [%.1f] welfare [%.1f]\n", currentSpeed, desiredSpeed, bytes/1024.0, meritBytes/1024.0, welfareBytes/1024.0 );
2453#endif
2454
2455    /* how much welfare each peer gets */
2456    welfare = welfareBytes / peerCount; 
2457
2458    for( i=0; i<peerCount; ++i )
2459    {
2460        tr_peer * peer = peers[i];
2461        const size_t merit = tr_rcRate( peers[i]->pieceSpeed[direction] ) * meritMultiplier;
2462        tr_peerIoAllocateBandwidth( peer->io, direction, merit + welfare );
2463    }
2464}
2465
2466static void
2467givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2468                             tr_direction  direction )
2469{
2470    const int n = tr_ptrArraySize( peers );
2471    int       i;
2472
2473    for( i = 0; i < n; ++i )
2474    {
2475        tr_peer * peer = tr_ptrArrayNth( peers, i );
2476        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2477    }
2478}
2479
2480static void
2481pumpAllPeers( tr_peerMgr * mgr )
2482{
2483    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2484    int       i, j;
2485
2486    for( i = 0; i < torrentCount; ++i )
2487    {
2488        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2489        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2490        {
2491            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2492            tr_peerMsgsPulse( peer->msgs );
2493        }
2494    }
2495}
2496
2497static void
2498getBandwidthPeers( Torrent       * t,
2499                   tr_direction    dir,
2500                   tr_ptrArray   * appendme,
2501                   double        * speed )
2502{
2503    int i, peerCount;
2504    tr_peer ** peers;
2505
2506    assert( t );
2507    assert( torrentIsLocked( t ) );
2508    assert( dir == TR_UP || dir == TR_DOWN );
2509    assert( appendme );
2510    assert( speed );
2511
2512    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
2513
2514    for( i=0; i<peerCount; ++i )
2515    {
2516        tr_peer * p = peers[i];
2517
2518        if( p->msgs )
2519        {
2520            if( ( ( dir == TR_DOWN ) && clientIsDownloadingFrom( p ) ) || ( ( dir == TR_UP ) && clientIsUploadingTo( p ) ) )
2521            {
2522                tr_ptrArrayAppend( appendme, p );
2523                *speed += tr_rcRate( p->pieceSpeed[dir] );
2524            }
2525        }
2526    }
2527}
2528
2529/**
2530 * Allocate bandwidth for each peer connection.
2531 *
2532 * @param mgr the peer manager
2533 * @param direction whether to allocate upload or download bandwidth
2534 * @return the amount of directional bandwidth used since the last pulse.
2535 */
2536static void
2537allocateBandwidth( tr_peerMgr * mgr,
2538                   tr_direction direction )
2539{
2540    tr_session *  session = mgr->session;
2541    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2542    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2543    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2544    double        globalPoolSpeed = 0;
2545    int           i;
2546
2547    assert( mgr );
2548    assert( direction == TR_UP || direction == TR_DOWN );
2549
2550    /* before allocating bandwidth, pump the connected peers */
2551    pumpAllPeers( mgr );
2552
2553    for( i=0; i<torrentCount; ++i )
2554    {
2555        Torrent * t = torrents[i];
2556        tr_speedlimit speedMode;
2557
2558        /* no point in allocating bandwidth for stopped torrents */
2559        if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
2560            continue;
2561
2562        /* if piece data is disallowed, don't bother limiting bandwidth --
2563         * we won't be asking for, or sending out, any pieces */
2564        if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
2565            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2566        else
2567            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2568           
2569        /* process the torrent's peers based on its speed mode */
2570        switch( speedMode )
2571        {
2572            case TR_SPEEDLIMIT_UNLIMITED:
2573                givePeersUnlimitedBandwidth( t->peers, direction );
2574                break;
2575
2576            case TR_SPEEDLIMIT_SINGLE: {
2577                tr_ptrArray * peers = tr_ptrArrayNew( );
2578                double speed = 0;
2579                getBandwidthPeers( t, direction, peers, &speed );
2580                setPeerBandwidth( peers, direction, speed, tr_torrentGetSpeedLimit( t->tor, direction ) );
2581                tr_ptrArrayFree( peers, NULL );
2582                break;
2583            }
2584
2585            case TR_SPEEDLIMIT_GLOBAL:
2586                getBandwidthPeers( t, direction, globalPool, &globalPoolSpeed );
2587                break;
2588        }
2589    }
2590
2591    /* handle the global pool's connections */
2592    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2593        givePeersUnlimitedBandwidth( globalPool, direction );
2594    else
2595        setPeerBandwidth( globalPool, direction, globalPoolSpeed,
2596                          tr_sessionGetSpeedLimit( session, direction ) );
2597
2598    /* now that we've allocated bandwidth, pump all the connected peers */
2599    pumpAllPeers( mgr );
2600
2601    /* cleanup */
2602    tr_ptrArrayFree( globalPool, NULL );
2603}
2604
2605static int
2606bandwidthPulse( void * vmgr )
2607{
2608    tr_peerMgr * mgr = vmgr;
2609    int          i;
2610
2611    managerLock( mgr );
2612
2613    /* allocate the upload and download bandwidth */
2614    for( i = 0; i < 2; ++i )
2615        allocateBandwidth( mgr, i );
2616
2617    managerUnlock( mgr );
2618    return TRUE;
2619}
2620
Note: See TracBrowser for help on using the repository browser.