source: trunk/libtransmission/peer-mgr.c @ 7069

Last change on this file since 7069 was 7069, checked in by charles, 10 years ago

more fucking around with the speed measurements.

  • Property svn:keywords set to Date Rev Author Id
File size: 74.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7069 2008-11-08 02:49:04Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57   
58    /* how frequently to reallocate bandwidth */
59    BANDWIDTH_PERIOD_MSEC = 250,
60
61    /* max # of peers to ask fer per torrent per reconnect pulse */
62    MAX_RECONNECTIONS_PER_PULSE = 2,
63
64    /* max number of peers to ask for per second overall.
65    * this throttle is to avoid overloading the router */
66    MAX_CONNECTIONS_PER_SECOND = 4,
67
68    /* number of unchoked peers per torrent.
69     * FIXME: this probably ought to be configurable */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91/* We keep one of these for every peer we know about, whether
92 * it's connected or not, so the struct must be small.
93 * When our current connections underperform, we dip back
94 * into this list for new ones. */
95struct peer_atom
96{
97    uint8_t           from;
98    uint8_t           flags; /* these match the added_f flags */
99    uint8_t           myflags; /* flags that aren't defined in added_f */
100    uint16_t          port;
101    uint16_t          numFails;
102    struct in_addr    addr;
103    time_t            time; /* when the peer's connection status last changed */
104    time_t            piece_data_time;
105};
106
107typedef struct
108{
109    unsigned int    isRunning : 1;
110
111    uint8_t         hash[SHA_DIGEST_LENGTH];
112    int         *   pendingRequestCount;
113    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
114    tr_ptrArray *   pool; /* struct peer_atom */
115    tr_ptrArray *   peers; /* tr_peer */
116    tr_ptrArray *   webseeds; /* tr_webseed */
117    tr_timer *      reconnectTimer;
118    tr_timer *      rechokeTimer;
119    tr_timer *      refillTimer;
120    tr_torrent *    tor;
121    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
122
123    struct tr_peerMgr * manager;
124}
125Torrent;
126
127struct tr_peerMgr
128{
129    tr_session      * session;
130    tr_ptrArray     * torrents; /* Torrent */
131    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
132    tr_timer        * bandwidthTimer;
133    tr_ratecontrol  * globalPoolRawSpeed[2];
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187compareAddresses( const struct in_addr * a,
188                  const struct in_addr * b )
189{
190    if( a->s_addr != b->s_addr )
191        return a->s_addr < b->s_addr ? -1 : 1;
192
193    return 0;
194}
195
196static int
197handshakeCompareToAddr( const void * va,
198                        const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a,
207                  const void * b )
208{
209    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
210}
211
212static tr_handshake*
213getExistingHandshake( tr_ptrArray *          handshakes,
214                      const struct in_addr * in_addr )
215{
216    return tr_ptrArrayFindSorted( handshakes,
217                                  in_addr,
218                                  handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va,
223                          const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va,
232                  const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va,
274             const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return compareAddresses( &a->in_addr, &b->in_addr );
280}
281
282static int
283peerCompareToAddr( const void * va,
284                   const void * vb )
285{
286    const tr_peer * a = va;
287
288    return compareAddresses( &a->in_addr, vb );
289}
290
291static tr_peer*
292getExistingPeer( Torrent *              torrent,
293                 const struct in_addr * in_addr )
294{
295    assert( torrentIsLocked( torrent ) );
296    assert( in_addr );
297
298    return tr_ptrArrayFindSorted( torrent->peers,
299                                  in_addr,
300                                  peerCompareToAddr );
301}
302
303static struct peer_atom*
304getExistingAtom( const                  Torrent * t,
305                 const struct in_addr * addr )
306{
307    assert( torrentIsLocked( t ) );
308    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
309}
310
311static int
312peerIsInUse( const Torrent *        ct,
313             const struct in_addr * addr )
314{
315    Torrent * t = (Torrent*) ct;
316
317    assert( torrentIsLocked ( t ) );
318
319    return getExistingPeer( t, addr )
320           || getExistingHandshake( t->outgoingHandshakes, addr )
321           || getExistingHandshake( t->manager->incomingHandshakes, addr );
322}
323
324static tr_peer*
325peerConstructor( const struct in_addr * in_addr )
326{
327    tr_peer * p;
328
329    p = tr_new0( tr_peer, 1 );
330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
331    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
332    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
333    return p;
334}
335
336static tr_peer*
337getPeer( Torrent *              torrent,
338         const struct in_addr * in_addr )
339{
340    tr_peer * peer;
341
342    assert( torrentIsLocked( torrent ) );
343
344    peer = getExistingPeer( torrent, in_addr );
345
346    if( peer == NULL )
347    {
348        peer = peerConstructor( in_addr );
349        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
350    }
351
352    return peer;
353}
354
355static void
356peerDestructor( tr_peer * peer )
357{
358    assert( peer );
359    assert( peer->msgs );
360
361    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
362    tr_peerMsgsFree( peer->msgs );
363
364    tr_peerIoFree( peer->io );
365
366    tr_bitfieldFree( peer->have );
367    tr_bitfieldFree( peer->blame );
368    tr_free( peer->client );
369
370    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
371    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
372    tr_free( peer );
373}
374
375static void
376removePeer( Torrent * t,
377            tr_peer * peer )
378{
379    tr_peer *          removed;
380    struct peer_atom * atom;
381
382    assert( torrentIsLocked( t ) );
383
384    atom = getExistingAtom( t, &peer->in_addr );
385    assert( atom );
386    atom->time = time( NULL );
387
388    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
389    assert( removed == peer );
390    peerDestructor( removed );
391}
392
393static void
394removeAllPeers( Torrent * t )
395{
396    while( !tr_ptrArrayEmpty( t->peers ) )
397        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
398}
399
400static void
401torrentDestructor( void * vt )
402{
403    Torrent * t = vt;
404    uint8_t   hash[SHA_DIGEST_LENGTH];
405
406    assert( t );
407    assert( !t->isRunning );
408    assert( t->peers );
409    assert( torrentIsLocked( t ) );
410    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
411    assert( tr_ptrArrayEmpty( t->peers ) );
412
413    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
414
415    tr_timerFree( &t->reconnectTimer );
416    tr_timerFree( &t->rechokeTimer );
417    tr_timerFree( &t->refillTimer );
418
419    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
420    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
421    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
422    tr_ptrArrayFree( t->peers, NULL );
423
424    tr_free( t->pendingRequestCount );
425    tr_free( t );
426}
427
428static void peerCallbackFunc( void * vpeer,
429                              void * vevent,
430                              void * vt );
431
432static Torrent*
433torrentConstructor( tr_peerMgr * manager,
434                    tr_torrent * tor )
435{
436    int       i;
437    Torrent * t;
438
439    t = tr_new0( Torrent, 1 );
440    t->manager = manager;
441    t->tor = tor;
442    t->pool = tr_ptrArrayNew( );
443    t->peers = tr_ptrArrayNew( );
444    t->webseeds = tr_ptrArrayNew( );
445    t->outgoingHandshakes = tr_ptrArrayNew( );
446    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
447
448    for( i = 0; i < tor->info.webseedCount; ++i )
449    {
450        tr_webseed * w =
451            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
452                           t );
453        tr_ptrArrayAppend( t->webseeds, w );
454    }
455
456    return t;
457}
458
459/**
460 * For explanation, see http://www.bittorrent.org/fast_extensions.html
461 * Also see the "test-allowed-set" unit test
462 *
463 * @param k number of pieces in set
464 * @param sz number of pieces in the torrent
465 * @param infohash torrent's SHA1 hash
466 * @param ip peer's address
467 */
468struct tr_bitfield *
469tr_peerMgrGenerateAllowedSet(
470    const uint32_t         k,
471    const uint32_t         sz,
472    const                  uint8_t        *
473                           infohash,
474    const struct in_addr * ip )
475{
476    uint8_t       w[SHA_DIGEST_LENGTH + 4];
477    uint8_t       x[SHA_DIGEST_LENGTH];
478    tr_bitfield * a;
479    uint32_t      a_size;
480
481    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
482    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
483    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
484
485    a = tr_bitfieldNew( sz );
486    a_size = 0;
487
488    while( a_size < k )
489    {
490        int i;
491        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
492        {
493            uint32_t j = i * 4;                                /* (5) */
494            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
495            uint32_t index = y % sz;                           /* (7) */
496            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
497            {
498                tr_bitfieldAdd( a, index );                    /* (9) */
499                ++a_size;
500            }
501        }
502        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
503    }
504
505    return a;
506}
507
508static int bandwidthPulse( void * vmgr );
509
510
511tr_peerMgr*
512tr_peerMgrNew( tr_session * session )
513{
514    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
515
516    m->session = session;
517    m->torrents = tr_ptrArrayNew( );
518    m->incomingHandshakes = tr_ptrArrayNew( );
519    m->globalPoolRawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
520    m->globalPoolRawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
521    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
522    return m;
523}
524
525void
526tr_peerMgrFree( tr_peerMgr * manager )
527{
528    managerLock( manager );
529
530    tr_timerFree( &manager->bandwidthTimer );
531    tr_rcClose( manager->globalPoolRawSpeed[TR_CLIENT_TO_PEER] );
532    tr_rcClose( manager->globalPoolRawSpeed[TR_PEER_TO_CLIENT] );
533
534    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
535     * the item from manager->handshakes, so this is a little roundabout... */
536    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
537        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
538
539    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
540
541    /* free the torrents. */
542    tr_ptrArrayFree( manager->torrents, torrentDestructor );
543
544    managerUnlock( manager );
545    tr_free( manager );
546}
547
548static tr_peer**
549getConnectedPeers( Torrent * t,
550                   int *     setmeCount )
551{
552    int       i, peerCount, connectionCount;
553    tr_peer **peers;
554    tr_peer **ret;
555
556    assert( torrentIsLocked( t ) );
557
558    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
559    ret = tr_new( tr_peer *, peerCount );
560
561    for( i = connectionCount = 0; i < peerCount; ++i )
562        if( peers[i]->msgs )
563            ret[connectionCount++] = peers[i];
564
565    *setmeCount = connectionCount;
566    return ret;
567}
568
569static int
570clientIsDownloadingFrom( const tr_peer * peer )
571{
572    return peer->clientIsInterested && !peer->clientIsChoked;
573}
574
575static int
576clientIsUploadingTo( const tr_peer * peer )
577{
578    return peer->peerIsInterested && !peer->peerIsChoked;
579}
580
581/***
582****
583***/
584
585int
586tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
587                      const uint8_t *        torrentHash,
588                      const struct in_addr * addr )
589{
590    int                      isSeed = FALSE;
591    const Torrent *          t = NULL;
592    const struct peer_atom * atom = NULL;
593
594    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
595    if( t )
596        atom = getExistingAtom( t, addr );
597    if( atom )
598        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
599
600    return isSeed;
601}
602
603/****
604*****
605*****  REFILL
606*****
607****/
608
609static void
610assertValidPiece( Torrent * t, tr_piece_index_t piece )
611{
612    assert( t );
613    assert( t->tor );
614    assert( piece < t->tor->info.pieceCount );
615}
616
617static int
618getPieceRequests( Torrent * t, tr_piece_index_t piece )
619{
620    assertValidPiece( t, piece );
621
622    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
623}
624
625static void
626incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
627{
628    assertValidPiece( t, piece );
629
630    if( t->pendingRequestCount == NULL )
631        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
632    t->pendingRequestCount[piece]++;
633}
634
635static void
636decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
637{
638    assertValidPiece( t, piece );
639
640    if( t->pendingRequestCount )
641        t->pendingRequestCount[piece]--;
642}
643
644struct tr_refill_piece
645{
646    tr_priority_t    priority;
647    uint32_t         piece;
648    uint32_t         peerCount;
649    int              random;
650    int              pendingRequestCount;
651    int              missingBlockCount;
652};
653
654static int
655compareRefillPiece( const void * aIn,
656                    const void * bIn )
657{
658    const struct tr_refill_piece * a = aIn;
659    const struct tr_refill_piece * b = bIn;
660
661    /* if one piece has a higher priority, it goes first */
662    if( a->priority != b->priority )
663        return a->priority > b->priority ? -1 : 1;
664
665    /* have a per-priority endgame */
666    if( a->pendingRequestCount != b->pendingRequestCount )
667        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
668
669    /* fewer missing pieces goes first */
670    if( a->missingBlockCount != b->missingBlockCount )
671        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
672
673    /* otherwise if one has fewer peers, it goes first */
674    if( a->peerCount != b->peerCount )
675        return a->peerCount < b->peerCount ? -1 : 1;
676
677    /* otherwise go with our random seed */
678    if( a->random != b->random )
679        return a->random < b->random ? -1 : 1;
680
681    return 0;
682}
683
684static tr_piece_index_t *
685getPreferredPieces( Torrent           * t,
686                    tr_piece_index_t  * pieceCount )
687{
688    const tr_torrent  * tor = t->tor;
689    const tr_info     * inf = &tor->info;
690    tr_piece_index_t    i;
691    tr_piece_index_t    poolSize = 0;
692    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
693    int                 peerCount;
694    tr_peer**           peers;
695
696    assert( torrentIsLocked( t ) );
697
698    peers = getConnectedPeers( t, &peerCount );
699
700    /* make a list of the pieces that we want but don't have */
701    for( i = 0; i < inf->pieceCount; ++i )
702        if( !tor->info.pieces[i].dnd
703                && !tr_cpPieceIsComplete( tor->completion, i ) )
704            pool[poolSize++] = i;
705
706    /* sort the pool by which to request next */
707    if( poolSize > 1 )
708    {
709        tr_piece_index_t j;
710        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
711
712        for( j = 0; j < poolSize; ++j )
713        {
714            int k;
715            const tr_piece_index_t piece = pool[j];
716            struct tr_refill_piece * setme = p + j;
717
718            setme->piece = piece;
719            setme->priority = inf->pieces[piece].priority;
720            setme->peerCount = 0;
721            setme->random = tr_cryptoWeakRandInt( INT_MAX );
722            setme->pendingRequestCount = getPieceRequests( t, piece );
723            setme->missingBlockCount
724                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
725
726            for( k = 0; k < peerCount; ++k )
727            {
728                const tr_peer * peer = peers[k];
729                if( peer->peerIsInterested
730                        && !peer->clientIsChoked
731                        && tr_bitfieldHas( peer->have, piece ) )
732                    ++setme->peerCount;
733            }
734        }
735
736        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
737               compareRefillPiece );
738
739        for( j = 0; j < poolSize; ++j )
740            pool[j] = p[j].piece;
741
742        tr_free( p );
743    }
744
745    tr_free( peers );
746
747    *pieceCount = poolSize;
748    return pool;
749}
750
751struct tr_blockIterator
752{
753    Torrent * t;
754    tr_block_index_t blockIndex, blockCount, *blocks;
755    tr_piece_index_t pieceIndex, pieceCount, *pieces;
756};
757
758static struct tr_blockIterator*
759blockIteratorNew( Torrent * t )
760{
761    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
762    i->t = t;
763    i->pieces = getPreferredPieces( t, &i->pieceCount );
764    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
765    return i;
766}
767
768static int
769blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
770{
771    int found;
772    Torrent * t = i->t;
773    tr_torrent * tor = t->tor;
774
775    while( ( i->blockIndex == i->blockCount )
776        && ( i->pieceIndex < i->pieceCount ) )
777    {
778        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
779        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
780        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
781        tr_block_index_t block;
782
783        assert( index < tor->info.pieceCount );
784
785        i->blockCount = 0;
786        i->blockIndex = 0;
787        for( block=b; block!=e; ++block )
788            if( !tr_cpBlockIsComplete( tor->completion, block ) )
789                i->blocks[i->blockCount++] = block;
790    }
791
792    if(( found = ( i->blockIndex < i->blockCount )))
793        *setme = i->blocks[i->blockIndex++];
794
795    return found;
796}
797
798static void
799blockIteratorFree( struct tr_blockIterator * i )
800{
801    tr_free( i->blocks );
802    tr_free( i->pieces );
803    tr_free( i );
804}
805
806static tr_peer**
807getPeersUploadingToClient( Torrent * t,
808                           int *     setmeCount )
809{
810    int j;
811    int peerCount = 0;
812    int retCount = 0;
813    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
814    tr_peer ** ret = tr_new( tr_peer *, peerCount );
815
816    j = 0; /* this is a temporary test to make sure we walk through all the peers */
817    if( peerCount )
818    {
819        /* Get a list of peers we're downloading from.
820           Pick a different starting point each time so all peers
821           get a chance at being the first in line */
822        const int fencepost = tr_cryptoWeakRandInt( peerCount );
823        int i = fencepost;
824        do {
825            if( clientIsDownloadingFrom( peers[i] ) )
826                ret[retCount++] = peers[i];
827            i = ( i + 1 ) % peerCount;
828            ++j;
829        } while( i != fencepost );
830    }
831    assert( j == peerCount );
832    *setmeCount = retCount;
833    return ret;
834}
835
836static uint32_t
837getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
838{
839    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
840    const uint64_t blockPos = tor->blockSize * b;
841    assert( blockPos >= piecePos );
842    return (uint32_t)( blockPos - piecePos );
843}
844
845static int
846refillPulse( void * vtorrent )
847{
848    tr_block_index_t block;
849    int peerCount;
850    int webseedCount;
851    tr_peer ** peers;
852    tr_webseed ** webseeds;
853    struct tr_blockIterator * blockIterator;
854    Torrent * t = vtorrent;
855    tr_torrent * tor = t->tor;
856
857    if( !t->isRunning )
858        return TRUE;
859    if( tr_torrentIsSeed( t->tor ) )
860        return TRUE;
861
862    torrentLock( t );
863    tordbg( t, "Refilling Request Buffers..." );
864
865    blockIterator = blockIteratorNew( t );
866    peers = getPeersUploadingToClient( t, &peerCount );
867    webseedCount = tr_ptrArraySize( t->webseeds );
868    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
869                          webseedCount * sizeof( tr_webseed* ) );
870
871    while( ( webseedCount || peerCount )
872        && blockIteratorNext( blockIterator, &block ) )
873    {
874        int j;
875        int handled = FALSE;
876
877        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
878        const uint32_t offset = getBlockOffsetInPiece( tor, block );
879        const uint32_t length = tr_torBlockCountBytes( tor, block );
880
881        /* find a peer who can ask for this block */
882        for( j=0; !handled && j<peerCount; )
883        {
884            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
885                                                   index, offset, length );
886            switch( val )
887            {
888                case TR_ADDREQ_FULL:
889                case TR_ADDREQ_CLIENT_CHOKED:
890                    peers[j] = peers[--peerCount];
891                    break;
892
893                case TR_ADDREQ_MISSING:
894                case TR_ADDREQ_DUPLICATE:
895                    ++j;
896                    break;
897
898                case TR_ADDREQ_OK:
899                    incrementPieceRequests( t, index );
900                    handled = TRUE;
901                    break;
902
903                default:
904                    assert( 0 && "unhandled value" );
905                    break;
906            }
907        }
908
909        /* maybe one of the webseeds can do it */
910        for( j=0; !handled && j<webseedCount; )
911        {
912            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
913                                                          index, offset, length );
914            switch( val )
915            {
916                case TR_ADDREQ_FULL:
917                    webseeds[j] = webseeds[--webseedCount];
918                    break;
919
920                case TR_ADDREQ_OK:
921                    incrementPieceRequests( t, index );
922                    handled = TRUE;
923                    break;
924
925                default:
926                    assert( 0 && "unhandled value" );
927                    break;
928            }
929        }
930    }
931
932    /* cleanup */
933    blockIteratorFree( blockIterator );
934    tr_free( webseeds );
935    tr_free( peers );
936
937    t->refillTimer = NULL;
938    torrentUnlock( t );
939    return FALSE;
940}
941
942static void
943broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
944{
945    int i, size;
946    tr_peer ** peers;
947
948    assert( torrentIsLocked( t ) );
949
950    peers = getConnectedPeers( t, &size );
951    for( i=0; i<size; ++i )
952        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
953    tr_free( peers );
954}
955
956static void
957addStrike( Torrent * t,
958           tr_peer * peer )
959{
960    tordbg( t, "increasing peer %s strike count to %d",
961            tr_peerIoAddrStr( &peer->in_addr,
962                              peer->port ), peer->strikes + 1 );
963
964    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
965    {
966        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
967        atom->myflags |= MYFLAG_BANNED;
968        peer->doPurge = 1;
969        tordbg( t, "banning peer %s",
970               tr_peerIoAddrStr( &atom->addr, atom->port ) );
971    }
972}
973
974static void
975gotBadPiece( Torrent *        t,
976             tr_piece_index_t pieceIndex )
977{
978    tr_torrent *   tor = t->tor;
979    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
980
981    tor->corruptCur += byteCount;
982    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
983}
984
985static void
986refillSoon( Torrent * t )
987{
988    if( t->refillTimer == NULL )
989        t->refillTimer = tr_timerNew( t->manager->session,
990                                      refillPulse, t,
991                                      REFILL_PERIOD_MSEC );
992}
993
994static void
995peerCallbackFunc( void * vpeer,
996                  void * vevent,
997                  void * vt )
998{
999    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
1000    Torrent *             t = (Torrent *) vt;
1001    const tr_peer_event * e = vevent;
1002
1003    torrentLock( t );
1004
1005    switch( e->eventType )
1006    {
1007        case TR_PEER_NEED_REQ:
1008            refillSoon( t );
1009            break;
1010
1011        case TR_PEER_CANCEL:
1012            decrementPieceRequests( t, e->pieceIndex );
1013            break;
1014
1015        case TR_PEER_PEER_GOT_DATA:
1016        {
1017            const time_t now = time( NULL );
1018            tr_torrent * tor = t->tor;
1019            tor->activityDate = now;
1020            tor->uploadedCur += e->length;
1021            if( peer )
1022                tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1023            tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1024            tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1025            tr_statsAddUploaded( tor->session, e->length );
1026            if( peer )
1027            {
1028                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1029                a->piece_data_time = now;
1030            }
1031            break;
1032        }
1033
1034        case TR_PEER_CLIENT_GOT_DATA:
1035        {
1036            const time_t now = time( NULL );
1037            tr_torrent * tor = t->tor;
1038            tor->activityDate = now;
1039            tr_statsAddDownloaded( tor->session, e->length );
1040            if( peer )
1041                tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1042            tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1043            tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1044            /* only add this to downloadedCur if we got it from a peer --
1045             * webseeds shouldn't count against our ratio.  As one tracker
1046             * admin put it, "Those pieces are downloaded directly from the
1047             * content distributor, not the peers, it is the tracker's job
1048             * to manage the swarms, not the web server and does not fit
1049             * into the jurisdiction of the tracker." */
1050            if( peer )
1051                tor->downloadedCur += e->length;
1052            if( peer ) {
1053                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1054                a->piece_data_time = now;
1055            }
1056            break;
1057        }
1058
1059        case TR_PEER_PEER_PROGRESS:
1060        {
1061            if( peer )
1062            {
1063                struct peer_atom * atom = getExistingAtom( t,
1064                                                           &peer->in_addr );
1065                const int          peerIsSeed = e->progress >= 1.0;
1066                if( peerIsSeed )
1067                {
1068                    tordbg( t, "marking peer %s as a seed",
1069                           tr_peerIoAddrStr( &atom->addr,
1070                                             atom->port ) );
1071                    atom->flags |= ADDED_F_SEED_FLAG;
1072                }
1073                else
1074                {
1075                    tordbg( t, "marking peer %s as a non-seed",
1076                           tr_peerIoAddrStr( &atom->addr,
1077                                             atom->port ) );
1078                    atom->flags &= ~ADDED_F_SEED_FLAG;
1079                }
1080            }
1081            break;
1082        }
1083
1084        case TR_PEER_CLIENT_GOT_BLOCK:
1085        {
1086            tr_torrent *     tor = t->tor;
1087
1088            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1089                                                e->offset );
1090
1091            tr_cpBlockAdd( tor->completion, block );
1092            decrementPieceRequests( t, e->pieceIndex );
1093
1094            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1095
1096            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1097            {
1098                const tr_piece_index_t p = e->pieceIndex;
1099                const int              ok = tr_ioTestPiece( tor, p );
1100
1101                if( !ok )
1102                {
1103                    tr_torerr( tor,
1104                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1105                              (unsigned long)p );
1106                }
1107
1108                tr_torrentSetHasPiece( tor, p, ok );
1109                tr_torrentSetPieceChecked( tor, p, TRUE );
1110                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1111
1112                if( !ok )
1113                    gotBadPiece( t, p );
1114                else
1115                {
1116                    int        i, peerCount;
1117                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1118                    for( i = 0; i < peerCount; ++i )
1119                        tr_peerMsgsHave( peers[i]->msgs, p );
1120                    tr_free( peers );
1121                }
1122
1123                tr_torrentRecheckCompleteness( tor );
1124            }
1125            break;
1126        }
1127
1128        case TR_PEER_ERROR:
1129            if( e->err == EINVAL )
1130            {
1131                addStrike( t, peer );
1132                peer->doPurge = 1;
1133            }
1134            else if( ( e->err == ERANGE )
1135                  || ( e->err == EMSGSIZE )
1136                  || ( e->err == ENOTCONN ) )
1137            {
1138                /* some protocol error from the peer */
1139                peer->doPurge = 1;
1140            }
1141            else /* a local error, such as an IO error */
1142            {
1143                t->tor->error = e->err;
1144                tr_strlcpy( t->tor->errorString,
1145                            tr_strerror( t->tor->error ),
1146                            sizeof( t->tor->errorString ) );
1147                tr_torrentStop( t->tor );
1148            }
1149            break;
1150
1151        default:
1152            assert( 0 );
1153    }
1154
1155    torrentUnlock( t );
1156}
1157
1158static void
1159ensureAtomExists( Torrent *              t,
1160                  const struct in_addr * addr,
1161                  uint16_t               port,
1162                  uint8_t                flags,
1163                  uint8_t                from )
1164{
1165    if( getExistingAtom( t, addr ) == NULL )
1166    {
1167        struct peer_atom * a;
1168        a = tr_new0( struct peer_atom, 1 );
1169        a->addr = *addr;
1170        a->port = port;
1171        a->flags = flags;
1172        a->from = from;
1173        tordbg( t, "got a new atom: %s",
1174               tr_peerIoAddrStr( &a->addr, a->port ) );
1175        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1176    }
1177}
1178
1179static int
1180getMaxPeerCount( const tr_torrent * tor )
1181{
1182    return tor->maxConnectedPeers;
1183}
1184
1185static int
1186getPeerCount( const Torrent * t )
1187{
1188    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1189               t->outgoingHandshakes );
1190}
1191
1192/* FIXME: this is kind of a mess. */
1193static int
1194myHandshakeDoneCB( tr_handshake *  handshake,
1195                   tr_peerIo *     io,
1196                   int             isConnected,
1197                   const uint8_t * peer_id,
1198                   void *          vmanager )
1199{
1200    int                    ok = isConnected;
1201    int                    success = FALSE;
1202    uint16_t               port;
1203    const struct in_addr * addr;
1204    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1205    Torrent *              t;
1206    tr_handshake *         ours;
1207
1208    assert( io );
1209    assert( isConnected == 0 || isConnected == 1 );
1210
1211    t = tr_peerIoHasTorrentHash( io )
1212        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1213        : NULL;
1214
1215    if( tr_peerIoIsIncoming ( io ) )
1216        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1217                                        handshake, handshakeCompare );
1218    else if( t )
1219        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1220                                        handshake, handshakeCompare );
1221    else
1222        ours = handshake;
1223
1224    assert( ours );
1225    assert( ours == handshake );
1226
1227    if( t )
1228        torrentLock( t );
1229
1230    addr = tr_peerIoGetAddress( io, &port );
1231
1232    if( !ok || !t || !t->isRunning )
1233    {
1234        if( t )
1235        {
1236            struct peer_atom * atom = getExistingAtom( t, addr );
1237            if( atom )
1238                ++atom->numFails;
1239        }
1240
1241        tr_peerIoFree( io );
1242    }
1243    else /* looking good */
1244    {
1245        struct peer_atom * atom;
1246        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1247        atom = getExistingAtom( t, addr );
1248        atom->time = time( NULL );
1249
1250        if( atom->myflags & MYFLAG_BANNED )
1251        {
1252            tordbg( t, "banned peer %s tried to reconnect",
1253                   tr_peerIoAddrStr( &atom->addr,
1254                                     atom->port ) );
1255            tr_peerIoFree( io );
1256        }
1257        else if( tr_peerIoIsIncoming( io )
1258               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1259
1260        {
1261            tr_peerIoFree( io );
1262        }
1263        else
1264        {
1265            tr_peer * peer = getExistingPeer( t, addr );
1266
1267            if( peer ) /* we already have this peer */
1268            {
1269                tr_peerIoFree( io );
1270            }
1271            else
1272            {
1273                peer = getPeer( t, addr );
1274                tr_free( peer->client );
1275
1276                if( !peer_id )
1277                    peer->client = NULL;
1278                else
1279                {
1280                    char client[128];
1281                    tr_clientForId( client, sizeof( client ), peer_id );
1282                    peer->client = tr_strdup( client );
1283                }
1284                peer->port = port;
1285                peer->io = io;
1286                peer->msgs =
1287                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1288                                    &peer->msgsTag );
1289
1290                success = TRUE;
1291            }
1292        }
1293    }
1294
1295    if( t )
1296        torrentUnlock( t );
1297
1298    return success;
1299}
1300
1301void
1302tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1303                       struct in_addr * addr,
1304                       uint16_t         port,
1305                       int              socket )
1306{
1307    managerLock( manager );
1308
1309    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1310    {
1311        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1312               inet_ntoa( *addr ) );
1313        tr_netClose( socket );
1314    }
1315    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1316    {
1317        tr_netClose( socket );
1318    }
1319    else /* we don't have a connetion to them yet... */
1320    {
1321        tr_peerIo *    io;
1322        tr_handshake * handshake;
1323
1324        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1325
1326        handshake = tr_handshakeNew( io,
1327                                     manager->session->encryptionMode,
1328                                     myHandshakeDoneCB,
1329                                     manager );
1330
1331        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1332                                 handshakeCompare );
1333    }
1334
1335    managerUnlock( manager );
1336}
1337
1338void
1339tr_peerMgrAddPex( tr_peerMgr *    manager,
1340                  const uint8_t * torrentHash,
1341                  uint8_t         from,
1342                  const tr_pex *  pex )
1343{
1344    Torrent * t;
1345
1346    managerLock( manager );
1347
1348    t = getExistingTorrent( manager, torrentHash );
1349    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1350        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1351
1352    managerUnlock( manager );
1353}
1354
1355tr_pex *
1356tr_peerMgrCompactToPex( const void *    compact,
1357                        size_t          compactLen,
1358                        const uint8_t * added_f,
1359                        size_t          added_f_len,
1360                        size_t *        pexCount )
1361{
1362    size_t          i;
1363    size_t          n = compactLen / 6;
1364    const uint8_t * walk = compact;
1365    tr_pex *        pex = tr_new0( tr_pex, n );
1366
1367    for( i = 0; i < n; ++i )
1368    {
1369        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1370        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1371        if( added_f && ( n == added_f_len ) )
1372            pex[i].flags = added_f[i];
1373    }
1374
1375    *pexCount = n;
1376    return pex;
1377}
1378
1379/**
1380***
1381**/
1382
1383void
1384tr_peerMgrSetBlame( tr_peerMgr *     manager,
1385                    const uint8_t *  torrentHash,
1386                    tr_piece_index_t pieceIndex,
1387                    int              success )
1388{
1389    if( !success )
1390    {
1391        int        peerCount, i;
1392        Torrent *  t = getExistingTorrent( manager, torrentHash );
1393        tr_peer ** peers;
1394
1395        assert( torrentIsLocked( t ) );
1396
1397        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1398        for( i = 0; i < peerCount; ++i )
1399        {
1400            tr_peer * peer = peers[i];
1401            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1402            {
1403                tordbg(
1404                    t,
1405                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1406                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1407                    pieceIndex, (int)peer->strikes + 1 );
1408                addStrike( t, peer );
1409            }
1410        }
1411    }
1412}
1413
1414int
1415tr_pexCompare( const void * va,
1416               const void * vb )
1417{
1418    const tr_pex * a = va;
1419    const tr_pex * b = vb;
1420    int            i =
1421        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1422
1423    if( i ) return i;
1424    if( a->port < b->port ) return -1;
1425    if( a->port > b->port ) return 1;
1426    return 0;
1427}
1428
1429int tr_pexCompare( const void * a,
1430                   const void * b );
1431
1432static int
1433peerPrefersCrypto( const tr_peer * peer )
1434{
1435    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1436        return TRUE;
1437
1438    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1439        return FALSE;
1440
1441    return tr_peerIoIsEncrypted( peer->io );
1442}
1443
1444int
1445tr_peerMgrGetPeers( tr_peerMgr *    manager,
1446                    const uint8_t * torrentHash,
1447                    tr_pex **       setme_pex )
1448{
1449    int peerCount = 0;
1450    const Torrent *  t;
1451
1452    managerLock( manager );
1453
1454    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1455    if( !t )
1456    {
1457        *setme_pex = NULL;
1458    }
1459    else
1460    {
1461        int i;
1462        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1463        tr_pex * pex = tr_new( tr_pex, peerCount );
1464        tr_pex * walk = pex;
1465
1466        for( i = 0; i < peerCount; ++i, ++walk )
1467        {
1468            const tr_peer * peer = peers[i];
1469            walk->in_addr = peer->in_addr;
1470            walk->port = peer->port;
1471            walk->flags = 0;
1472            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1473            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1474        }
1475
1476        assert( ( walk - pex ) == peerCount );
1477        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1478        *setme_pex = pex;
1479    }
1480
1481    managerUnlock( manager );
1482    return peerCount;
1483}
1484
1485static int reconnectPulse( void * vtorrent );
1486
1487static int rechokePulse( void * vtorrent );
1488
1489void
1490tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1491                        const uint8_t * torrentHash )
1492{
1493    Torrent * t;
1494
1495    managerLock( manager );
1496
1497    t = getExistingTorrent( manager, torrentHash );
1498
1499    assert( t );
1500    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1501    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1502
1503    if( !t->isRunning )
1504    {
1505        t->isRunning = 1;
1506
1507        t->reconnectTimer = tr_timerNew( t->manager->session,
1508                                         reconnectPulse, t,
1509                                         RECONNECT_PERIOD_MSEC );
1510
1511        t->rechokeTimer = tr_timerNew( t->manager->session,
1512                                       rechokePulse, t,
1513                                       RECHOKE_PERIOD_MSEC );
1514
1515        reconnectPulse( t );
1516
1517        rechokePulse( t );
1518
1519        if( !tr_ptrArrayEmpty( t->webseeds ) )
1520            refillSoon( t );
1521    }
1522
1523    managerUnlock( manager );
1524}
1525
1526static void
1527stopTorrent( Torrent * t )
1528{
1529    assert( torrentIsLocked( t ) );
1530
1531    t->isRunning = 0;
1532    tr_timerFree( &t->rechokeTimer );
1533    tr_timerFree( &t->reconnectTimer );
1534
1535    /* disconnect the peers. */
1536    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1537    tr_ptrArrayClear( t->peers );
1538
1539    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1540     * which removes the handshake from t->outgoingHandshakes... */
1541    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1542        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1543}
1544
1545void
1546tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1547                       const uint8_t * torrentHash )
1548{
1549    managerLock( manager );
1550
1551    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1552
1553    managerUnlock( manager );
1554}
1555
1556void
1557tr_peerMgrAddTorrent( tr_peerMgr * manager,
1558                      tr_torrent * tor )
1559{
1560    Torrent * t;
1561
1562    managerLock( manager );
1563
1564    assert( tor );
1565    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1566
1567    t = torrentConstructor( manager, tor );
1568    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1569
1570    managerUnlock( manager );
1571}
1572
1573void
1574tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1575                         const uint8_t * torrentHash )
1576{
1577    Torrent * t;
1578
1579    managerLock( manager );
1580
1581    t = getExistingTorrent( manager, torrentHash );
1582    assert( t );
1583    stopTorrent( t );
1584    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1585    torrentDestructor( t );
1586
1587    managerUnlock( manager );
1588}
1589
1590void
1591tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1592                               const uint8_t *    torrentHash,
1593                               int8_t *           tab,
1594                               unsigned int       tabCount )
1595{
1596    tr_piece_index_t   i;
1597    const Torrent *    t;
1598    const tr_torrent * tor;
1599    float              interval;
1600    int                isComplete;
1601    int                peerCount;
1602    const tr_peer **   peers;
1603
1604    managerLock( manager );
1605
1606    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1607    tor = t->tor;
1608    interval = tor->info.pieceCount / (float)tabCount;
1609    isComplete = tor
1610                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1611    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1612
1613    memset( tab, 0, tabCount );
1614
1615    for( i = 0; tor && i < tabCount; ++i )
1616    {
1617        const int piece = i * interval;
1618
1619        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1620            tab[i] = -1;
1621        else if( peerCount )
1622        {
1623            int j;
1624            for( j = 0; j < peerCount; ++j )
1625                if( tr_bitfieldHas( peers[j]->have, i ) )
1626                    ++tab[i];
1627        }
1628    }
1629
1630    managerUnlock( manager );
1631}
1632
1633/* Returns the pieces that are available from peers */
1634tr_bitfield*
1635tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1636                        const uint8_t *    torrentHash )
1637{
1638    int           i, size;
1639    Torrent *     t;
1640    tr_peer **    peers;
1641    tr_bitfield * pieces;
1642
1643    managerLock( manager );
1644
1645    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1646    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1647    peers = getConnectedPeers( t, &size );
1648    for( i = 0; i < size; ++i )
1649        tr_bitfieldOr( pieces, peers[i]->have );
1650
1651    managerUnlock( manager );
1652    tr_free( peers );
1653    return pieces;
1654}
1655
1656int
1657tr_peerMgrHasConnections( const tr_peerMgr * manager,
1658                          const uint8_t *    torrentHash )
1659{
1660    int             ret;
1661    const Torrent * t;
1662
1663    managerLock( manager );
1664
1665    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1666    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1667               || !tr_ptrArrayEmpty( t->webseeds ) );
1668
1669    managerUnlock( manager );
1670    return ret;
1671}
1672
1673void
1674tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1675                        const uint8_t *    torrentHash,
1676                        int *              setmePeersKnown,
1677                        int *              setmePeersConnected,
1678                        int *              setmeSeedsConnected,
1679                        int *              setmeWebseedsSendingToUs,
1680                        int *              setmePeersSendingToUs,
1681                        int *              setmePeersGettingFromUs,
1682                        int *              setmePeersFrom )
1683{
1684    int                 i, size;
1685    const Torrent *     t;
1686    const tr_peer **    peers;
1687    const tr_webseed ** webseeds;
1688
1689    managerLock( manager );
1690
1691    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1692    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1693
1694    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1695    *setmePeersConnected       = 0;
1696    *setmeSeedsConnected       = 0;
1697    *setmePeersGettingFromUs   = 0;
1698    *setmePeersSendingToUs     = 0;
1699    *setmeWebseedsSendingToUs  = 0;
1700
1701    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1702        setmePeersFrom[i] = 0;
1703
1704    for( i = 0; i < size; ++i )
1705    {
1706        const tr_peer *          peer = peers[i];
1707        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1708
1709        if( peer->io == NULL ) /* not connected */
1710            continue;
1711
1712        ++ * setmePeersConnected;
1713
1714        ++setmePeersFrom[atom->from];
1715
1716        if( clientIsDownloadingFrom( peer ) )
1717            ++ * setmePeersSendingToUs;
1718
1719        if( clientIsUploadingTo( peer ) )
1720            ++ * setmePeersGettingFromUs;
1721
1722        if( atom->flags & ADDED_F_SEED_FLAG )
1723            ++ * setmeSeedsConnected;
1724    }
1725
1726    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1727    for( i = 0; i < size; ++i )
1728    {
1729        if( tr_webseedIsActive( webseeds[i] ) )
1730            ++ * setmeWebseedsSendingToUs;
1731    }
1732
1733    managerUnlock( manager );
1734}
1735
1736float*
1737tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1738                     const uint8_t *    torrentHash )
1739{
1740    const Torrent *     t;
1741    const tr_webseed ** webseeds;
1742    int                 i;
1743    int                 webseedCount;
1744    float *             ret;
1745
1746    assert( manager );
1747    managerLock( manager );
1748
1749    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1750    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1751                                                     &webseedCount );
1752    assert( webseedCount == t->tor->info.webseedCount );
1753    ret = tr_new0( float, webseedCount );
1754
1755    for( i = 0; i < webseedCount; ++i )
1756        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1757            ret[i] = -1.0;
1758
1759    managerUnlock( manager );
1760    return ret;
1761}
1762
1763double
1764tr_peerGetPieceSpeed( const tr_peer    * peer,
1765                      tr_direction       direction )
1766{
1767    assert( peer );
1768    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1769
1770    return tr_rcRate( peer->pieceSpeed[direction] );
1771}
1772
1773
1774struct tr_peer_stat *
1775tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1776                     const   uint8_t     * torrentHash,
1777                     int                 * setmeCount UNUSED )
1778{
1779    int             i, size;
1780    const Torrent * t;
1781    tr_peer **      peers;
1782    tr_peer_stat *  ret;
1783
1784    assert( manager );
1785    managerLock( manager );
1786
1787    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1788    peers = getConnectedPeers( (Torrent*)t, &size );
1789    ret = tr_new0( tr_peer_stat, size );
1790
1791    for( i = 0; i < size; ++i )
1792    {
1793        char *                   pch;
1794        const tr_peer *          peer = peers[i];
1795        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1796        tr_peer_stat *           stat = ret + i;
1797
1798        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1799        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1800                   sizeof( stat->client ) );
1801        stat->port               = peer->port;
1802        stat->from               = atom->from;
1803        stat->progress           = peer->progress;
1804        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1805        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1806        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1807        stat->peerIsChoked       = peer->peerIsChoked;
1808        stat->peerIsInterested   = peer->peerIsInterested;
1809        stat->clientIsChoked     = peer->clientIsChoked;
1810        stat->clientIsInterested = peer->clientIsInterested;
1811        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1812        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1813        stat->isUploadingTo      = clientIsUploadingTo( peer );
1814
1815        pch = stat->flagStr;
1816        if( t->optimistic == peer ) *pch++ = 'O';
1817        if( stat->isDownloadingFrom ) *pch++ = 'D';
1818        else if( stat->clientIsInterested ) *pch++ = 'd';
1819        if( stat->isUploadingTo ) *pch++ = 'U';
1820        else if( stat->peerIsInterested ) *pch++ = 'u';
1821        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1822                'K';
1823        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1824        if( stat->isEncrypted ) *pch++ = 'E';
1825        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1826        if( stat->isIncoming ) *pch++ = 'I';
1827        *pch = '\0';
1828    }
1829
1830    *setmeCount = size;
1831    tr_free( peers );
1832
1833    managerUnlock( manager );
1834    return ret;
1835}
1836
1837/**
1838***
1839**/
1840
1841struct ChokeData
1842{
1843    unsigned int    doUnchoke    : 1;
1844    unsigned int    isInterested : 1;
1845    double          rateToClient;
1846    double          rateToPeer;
1847    tr_peer *       peer;
1848};
1849
1850static int
1851tr_compareDouble( double a,
1852                  double b )
1853{
1854    if( a < b ) return -1;
1855    if( a > b ) return 1;
1856    return 0;
1857}
1858
1859static int
1860compareChoke( const void * va,
1861              const void * vb )
1862{
1863    const struct ChokeData * a = va;
1864    const struct ChokeData * b = vb;
1865    int                      diff = 0;
1866
1867    if( diff == 0 ) /* prefer higher dl speeds */
1868        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1869    if( diff == 0 ) /* prefer higher ul speeds */
1870        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1871    if( diff == 0 ) /* prefer unchoked */
1872        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1873
1874    return diff;
1875}
1876
1877static int
1878isNew( const tr_peer * peer )
1879{
1880    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1881}
1882
1883static int
1884isSame( const tr_peer * peer )
1885{
1886    return peer && peer->client && strstr( peer->client, "Transmission" );
1887}
1888
1889/**
1890***
1891**/
1892
1893static void
1894rechoke( Torrent * t )
1895{
1896    int                i, peerCount, size, unchokedInterested;
1897    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1898    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1899    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1900
1901    assert( torrentIsLocked( t ) );
1902
1903    /* sort the peers by preference and rate */
1904    for( i = 0, size = 0; i < peerCount; ++i )
1905    {
1906        tr_peer * peer = peers[i];
1907        if( peer->progress >= 1.0 ) /* choke all seeds */
1908            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1909        else if( chokeAll )
1910            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1911        else {
1912            struct ChokeData * n = &choke[size++];
1913            n->peer         = peer;
1914            n->isInterested = peer->peerIsInterested;
1915            n->rateToPeer   = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1916            n->rateToClient = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1917        }
1918    }
1919
1920    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1921
1922    /**
1923     * Reciprocation and number of uploads capping is managed by unchoking
1924     * the N peers which have the best upload rate and are interested.
1925     * This maximizes the client's download rate. These N peers are
1926     * referred to as downloaders, because they are interested in downloading
1927     * from the client.
1928     *
1929     * Peers which have a better upload rate (as compared to the downloaders)
1930     * but aren't interested get unchoked. If they become interested, the
1931     * downloader with the worst upload rate gets choked. If a client has
1932     * a complete file, it uses its upload rate rather than its download
1933     * rate to decide which peers to unchoke.
1934     */
1935    unchokedInterested = 0;
1936    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1937    {
1938        choke[i].doUnchoke = 1;
1939        if( choke[i].isInterested )
1940            ++unchokedInterested;
1941    }
1942
1943    /* optimistic unchoke */
1944    if( i < size )
1945    {
1946        int                n;
1947        struct ChokeData * c;
1948        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1949
1950        for( ; i < size; ++i )
1951        {
1952            if( choke[i].isInterested )
1953            {
1954                const tr_peer * peer = choke[i].peer;
1955                int             x = 1, y;
1956                if( isNew( peer ) ) x *= 3;
1957                if( isSame( peer ) ) x *= 3;
1958                for( y = 0; y < x; ++y )
1959                    tr_ptrArrayAppend( randPool, &choke[i] );
1960            }
1961        }
1962
1963        if( ( n = tr_ptrArraySize( randPool ) ) )
1964        {
1965            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1966            c->doUnchoke = 1;
1967            t->optimistic = c->peer;
1968        }
1969
1970        tr_ptrArrayFree( randPool, NULL );
1971    }
1972
1973    for( i = 0; i < size; ++i )
1974        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1975
1976    /* cleanup */
1977    tr_free( choke );
1978    tr_free( peers );
1979}
1980
1981static int
1982rechokePulse( void * vtorrent )
1983{
1984    Torrent * t = vtorrent;
1985
1986    torrentLock( t );
1987    rechoke( t );
1988    torrentUnlock( t );
1989    return TRUE;
1990}
1991
1992/***
1993****
1994****  Life and Death
1995****
1996***/
1997
1998static int
1999shouldPeerBeClosed( const Torrent * t,
2000                    const tr_peer * peer,
2001                    int             peerCount )
2002{
2003    const tr_torrent *       tor = t->tor;
2004    const time_t             now = time( NULL );
2005    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2006
2007    /* if it's marked for purging, close it */
2008    if( peer->doPurge )
2009    {
2010        tordbg( t, "purging peer %s because its doPurge flag is set",
2011               tr_peerIoAddrStr( &atom->addr,
2012                                 atom->port ) );
2013        return TRUE;
2014    }
2015
2016    /* if we're seeding and the peer has everything we have,
2017     * and enough time has passed for a pex exchange, then disconnect */
2018    if( tr_torrentIsSeed( tor ) )
2019    {
2020        int peerHasEverything;
2021        if( atom->flags & ADDED_F_SEED_FLAG )
2022            peerHasEverything = TRUE;
2023        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2024            peerHasEverything = FALSE;
2025        else
2026        {
2027            tr_bitfield * tmp =
2028                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2029            tr_bitfieldDifference( tmp, peer->have );
2030            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2031            tr_bitfieldFree( tmp );
2032        }
2033        if( peerHasEverything
2034          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2035        {
2036            tordbg( t, "purging peer %s because we're both seeds",
2037                   tr_peerIoAddrStr( &atom->addr,
2038                                     atom->port ) );
2039            return TRUE;
2040        }
2041    }
2042
2043    /* disconnect if it's been too long since piece data has been transferred.
2044     * this is on a sliding scale based on number of available peers... */
2045    {
2046        const int    relaxStrictnessIfFewerThanN =
2047            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2048        /* if we have >= relaxIfFewerThan, strictness is 100%.
2049         * if we have zero connections, strictness is 0% */
2050        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2051                                  ? 1.0
2052                                  : peerCount /
2053                                  (float)relaxStrictnessIfFewerThanN;
2054        const int    lo = MIN_UPLOAD_IDLE_SECS;
2055        const int    hi = MAX_UPLOAD_IDLE_SECS;
2056        const int    limit = lo + ( ( hi - lo ) * strictness );
2057        const time_t then = peer->pieceDataActivityDate;
2058        const int    idleTime = then ? ( now - then ) : 0;
2059        if( idleTime > limit )
2060        {
2061            tordbg(
2062                t,
2063                "purging peer %s because it's been %d secs since we shared anything",
2064                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2065            return TRUE;
2066        }
2067    }
2068
2069    return FALSE;
2070}
2071
2072static tr_peer **
2073getPeersToClose( Torrent * t,
2074                 int *     setmeSize )
2075{
2076    int               i, peerCount, outsize;
2077    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2078                                                           &peerCount );
2079    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2080
2081    assert( torrentIsLocked( t ) );
2082
2083    for( i = outsize = 0; i < peerCount; ++i )
2084        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2085            ret[outsize++] = peers[i];
2086
2087    *setmeSize = outsize;
2088    return ret;
2089}
2090
2091static int
2092compareCandidates( const void * va,
2093                   const void * vb )
2094{
2095    const struct peer_atom * a = *(const struct peer_atom**) va;
2096    const struct peer_atom * b = *(const struct peer_atom**) vb;
2097
2098    /* <Charles> Here we would probably want to try reconnecting to
2099     * peers that had most recently given us data. Lots of users have
2100     * trouble with resets due to their routers and/or ISPs. This way we
2101     * can quickly recover from an unwanted reset. So we sort
2102     * piece_data_time in descending order.
2103     */
2104
2105    if( a->piece_data_time != b->piece_data_time )
2106        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2107
2108    if( a->numFails != b->numFails )
2109        return a->numFails < b->numFails ? -1 : 1;
2110
2111    if( a->time != b->time )
2112        return a->time < b->time ? -1 : 1;
2113
2114    return 0;
2115}
2116
2117static int
2118getReconnectIntervalSecs( const struct peer_atom * atom )
2119{
2120    int          sec;
2121    const time_t now = time( NULL );
2122
2123    /* if we were recently connected to this peer and transferring piece
2124     * data, try to reconnect to them sooner rather that later -- we don't
2125     * want network troubles to get in the way of a good peer. */
2126    if( ( now - atom->piece_data_time ) <=
2127       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2128        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2129
2130    /* don't allow reconnects more often than our minimum */
2131    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2132        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2133
2134    /* otherwise, the interval depends on how many times we've tried
2135     * and failed to connect to the peer */
2136    else switch( atom->numFails )
2137        {
2138            case 0:
2139                sec = 0; break;
2140
2141            case 1:
2142                sec = 5; break;
2143
2144            case 2:
2145                sec = 2 * 60; break;
2146
2147            case 3:
2148                sec = 15 * 60; break;
2149
2150            case 4:
2151                sec = 30 * 60; break;
2152
2153            case 5:
2154                sec = 60 * 60; break;
2155
2156            default:
2157                sec = 120 * 60; break;
2158        }
2159
2160    return sec;
2161}
2162
2163static struct peer_atom **
2164getPeerCandidates(                               Torrent * t,
2165                                           int * setmeSize )
2166{
2167    int                 i, atomCount, retCount;
2168    struct peer_atom ** atoms;
2169    struct peer_atom ** ret;
2170    const time_t        now = time( NULL );
2171    const int           seed = tr_torrentIsSeed( t->tor );
2172
2173    assert( torrentIsLocked( t ) );
2174
2175    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2176    ret = tr_new( struct peer_atom*, atomCount );
2177    for( i = retCount = 0; i < atomCount; ++i )
2178    {
2179        int                interval;
2180        struct peer_atom * atom = atoms[i];
2181
2182        /* peer fed us too much bad data ... we only keep it around
2183         * now to weed it out in case someone sends it to us via pex */
2184        if( atom->myflags & MYFLAG_BANNED )
2185            continue;
2186
2187        /* peer was unconnectable before, so we're not going to keep trying.
2188         * this is needs a separate flag from `banned', since if they try
2189         * to connect to us later, we'll let them in */
2190        if( atom->myflags & MYFLAG_UNREACHABLE )
2191            continue;
2192
2193        /* we don't need two connections to the same peer... */
2194        if( peerIsInUse( t, &atom->addr ) )
2195            continue;
2196
2197        /* no need to connect if we're both seeds... */
2198        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2199            continue;
2200
2201        /* don't reconnect too often */
2202        interval = getReconnectIntervalSecs( atom );
2203        if( ( now - atom->time ) < interval )
2204        {
2205            tordbg(
2206                t,
2207                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2208                i, tr_peerIoAddrStr( &atom->addr,
2209                                     atom->port ), interval );
2210            continue;
2211        }
2212
2213        /* Don't connect to peers in our blocklist */
2214        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2215            continue;
2216
2217        ret[retCount++] = atom;
2218    }
2219
2220    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2221    *setmeSize = retCount;
2222    return ret;
2223}
2224
2225static int
2226reconnectPulse( void * vtorrent )
2227{
2228    Torrent *     t = vtorrent;
2229    static time_t prevTime = 0;
2230    static int    newConnectionsThisSecond = 0;
2231    time_t        now;
2232
2233    torrentLock( t );
2234
2235    now = time( NULL );
2236    if( prevTime != now )
2237    {
2238        prevTime = now;
2239        newConnectionsThisSecond = 0;
2240    }
2241
2242    if( !t->isRunning )
2243    {
2244        removeAllPeers( t );
2245    }
2246    else
2247    {
2248        int                 i, nCandidates, nBad;
2249        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2250        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2251
2252        if( nBad || nCandidates )
2253            tordbg(
2254                t, "reconnect pulse for [%s]: %d bad connections, "
2255                   "%d connection candidates, %d atoms, max per pulse is %d",
2256                t->tor->info.name, nBad, nCandidates,
2257                tr_ptrArraySize( t->pool ),
2258                (int)MAX_RECONNECTIONS_PER_PULSE );
2259
2260        /* disconnect some peers.
2261           if we transferred piece data, then they might be good peers,
2262           so reset their `numFails' weight to zero.  otherwise we connected
2263           to them fruitlessly, so mark it as another fail */
2264        for( i = 0; i < nBad; ++i )
2265        {
2266            tr_peer *          peer = connections[i];
2267            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2268            if( peer->pieceDataActivityDate )
2269                atom->numFails = 0;
2270            else
2271                ++atom->numFails;
2272            tordbg( t, "removing bad peer %s",
2273                   tr_peerIoGetAddrStr( peer->io ) );
2274            removePeer( t, peer );
2275        }
2276
2277        /* add some new ones */
2278        for( i = 0;    ( i < nCandidates )
2279           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2280           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2281           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2282             ++i )
2283        {
2284            tr_peerMgr *       mgr = t->manager;
2285            struct peer_atom * atom = candidates[i];
2286            tr_peerIo *        io;
2287
2288            tordbg( t, "Starting an OUTGOING connection with %s",
2289                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2290
2291            io =
2292                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2293                                      t->hash );
2294            if( io == NULL )
2295            {
2296                atom->myflags |= MYFLAG_UNREACHABLE;
2297            }
2298            else
2299            {
2300                tr_handshake * handshake = tr_handshakeNew(
2301                    io,
2302                    mgr->session->
2303                    encryptionMode,
2304                    myHandshakeDoneCB,
2305                    mgr );
2306
2307                assert( tr_peerIoGetTorrentHash( io ) );
2308
2309                ++newConnectionsThisSecond;
2310
2311                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2312                                         handshakeCompare );
2313            }
2314
2315            atom->time = time( NULL );
2316        }
2317
2318        /* cleanup */
2319        tr_free( connections );
2320        tr_free( candidates );
2321    }
2322
2323    torrentUnlock( t );
2324    return TRUE;
2325}
2326
2327/****
2328*****
2329*****  BANDWIDTH ALLOCATION
2330*****
2331****/
2332
2333static double
2334allocateHowMuch( double                  desired_average_kb_per_sec,
2335                 const tr_ratecontrol  * ratecontrol )
2336{
2337    const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
2338    const double seconds_per_pulse = BANDWIDTH_PERIOD_MSEC / 1000.0;
2339    const double baseline_bytes_per_pulse = desired_average_kb_per_sec * 1024.0 * seconds_per_pulse;
2340    const double min = baseline_bytes_per_pulse * 0.85;
2341    const double max = baseline_bytes_per_pulse * 1.15;
2342    const double current_bytes_per_pulse = tr_rcRate( ratecontrol ) * 1024.0 * seconds_per_pulse;
2343    const double next_pulse_bytes = baseline_bytes_per_pulse * ( pulses_per_history + 1 )
2344                                  - ( current_bytes_per_pulse * pulses_per_history );
2345    double clamped;
2346
2347    /* clamp the return value to lessen oscillation */
2348    clamped = next_pulse_bytes;
2349    clamped = MAX( clamped, min );
2350    clamped = MIN( clamped, max );
2351
2352fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f (%.2f)\n",
2353         desired_average_kb_per_sec,
2354         tr_rcRate( ratecontrol ),
2355         clamped/1024.0,
2356         next_pulse_bytes/1024.0 );
2357
2358    return clamped;
2359}
2360
2361/**
2362 * Distributes a fixed amount of bandwidth among a set of peers.
2363 *
2364 * @param peerArray peers whose client-to-peer bandwidth will be set
2365 * @param direction whether to allocate upload or download bandwidth
2366 * @param history recent bandwidth history for these peers
2367 * @param desiredAvgKB overall bandwidth goal for this set of peers
2368 */
2369static void
2370setPeerBandwidth( tr_ptrArray          * peerArray,
2371                  const tr_direction     direction,
2372                  const tr_ratecontrol * ratecontrol,
2373                  double                 desiredAvgKB )
2374{
2375    const int    peerCount = tr_ptrArraySize( peerArray );
2376    const double bytes = allocateHowMuch( desiredAvgKB, ratecontrol );
2377    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2378    const double meritBytes = MAX( 0, bytes - welfareBytes );
2379    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2380    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2381    int          i;
2382    int          candidateCount;
2383    double       welfare;
2384    size_t       bytesUsed;
2385
2386    assert( meritBytes >= 0.0 );
2387    assert( welfareBytes >= 0.0 );
2388    assert( direction == TR_UP || direction == TR_DOWN );
2389
2390    for( i = candidateCount = 0; i < peerCount; ++i )
2391        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2392            candidates[candidateCount++] = peers[i];
2393        else
2394            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2395
2396    for( i = bytesUsed = 0; i < candidateCount; ++i )
2397        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2398                                                direction );
2399
2400    welfare = welfareBytes / candidateCount;
2401
2402    for( i = 0; i < candidateCount; ++i )
2403    {
2404        tr_peer *    peer = candidates[i];
2405        const double merit = bytesUsed
2406                             ? ( meritBytes *
2407                                tr_peerIoGetBandwidthUsed( peer->io,
2408                                                           direction ) ) /
2409                             bytesUsed
2410                             : ( meritBytes / candidateCount );
2411        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2412    }
2413
2414    /* cleanup */
2415    tr_free( candidates );
2416}
2417
2418static size_t
2419countHandshakeBandwidth( tr_ptrArray * handshakes,
2420                         tr_direction  direction )
2421{
2422    const int n = tr_ptrArraySize( handshakes );
2423    int       i;
2424    size_t    total;
2425
2426    for( i = total = 0; i < n; ++i )
2427    {
2428        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2429        total += tr_peerIoGetBandwidthUsed( io, direction );
2430    }
2431    return total;
2432}
2433
2434static size_t
2435countPeerBandwidth( tr_ptrArray * peers,
2436                    tr_direction  direction )
2437{
2438    const int n = tr_ptrArraySize( peers );
2439    int       i;
2440    size_t    total;
2441
2442    for( i = total = 0; i < n; ++i )
2443    {
2444        tr_peer * peer = tr_ptrArrayNth( peers, i );
2445        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2446    }
2447    return total;
2448}
2449
2450static void
2451givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2452                             tr_direction  direction )
2453{
2454    const int n = tr_ptrArraySize( peers );
2455    int       i;
2456
2457    for( i = 0; i < n; ++i )
2458    {
2459        tr_peer * peer = tr_ptrArrayNth( peers, i );
2460        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2461    }
2462}
2463
2464static void
2465pumpAllPeers( tr_peerMgr * mgr )
2466{
2467    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2468    int       i, j;
2469
2470    for( i = 0; i < torrentCount; ++i )
2471    {
2472        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2473        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2474        {
2475            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2476            tr_peerMsgsPulse( peer->msgs );
2477        }
2478    }
2479}
2480
2481/**
2482 * Allocate bandwidth for each peer connection.
2483 *
2484 * @param mgr the peer manager
2485 * @param direction whether to allocate upload or download bandwidth
2486 * @return the amount of directional bandwidth used since the last pulse.
2487 */
2488static double
2489allocateBandwidth( tr_peerMgr * mgr,
2490                   tr_direction direction )
2491{
2492    tr_session *  session = mgr->session;
2493    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2494    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2495    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2496    double        allBytesUsed = 0;
2497    size_t        poolBytesUsed = 0;
2498    int           i;
2499
2500    assert( mgr );
2501    assert( direction == TR_UP || direction == TR_DOWN );
2502
2503    /* before allocating bandwidth, pump the connected peers */
2504    pumpAllPeers( mgr );
2505
2506    for( i=0; i<torrentCount; ++i )
2507    {
2508        Torrent * t = torrents[i];
2509        size_t used;
2510        tr_speedlimit speedMode;
2511
2512        /* no point in allocating bandwidth for stopped torrents */
2513        if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
2514            continue;
2515
2516        used = countPeerBandwidth( t->peers, direction );
2517        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2518
2519        /* remember this torrent's bytes used */
2520        tr_rcTransferred( t->tor->rawSpeed[direction], used );
2521
2522        /* add this torrent's bandwidth use to allBytesUsed */
2523        allBytesUsed += used;
2524
2525        /* if piece data is disallowed, don't bother limiting bandwidth --
2526         * we won't be asking for, or sending out, any pieces */
2527        if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
2528            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2529        else
2530            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2531           
2532        /* process the torrent's peers based on its speed mode */
2533        switch( speedMode )
2534        {
2535            case TR_SPEEDLIMIT_UNLIMITED:
2536                givePeersUnlimitedBandwidth( t->peers, direction );
2537                break;
2538
2539            case TR_SPEEDLIMIT_SINGLE:
2540                setPeerBandwidth( t->peers, direction,
2541                                  t->tor->rawSpeed[direction],
2542                                  tr_torrentGetSpeedLimit( t->tor, direction ) );
2543                break;
2544
2545            case TR_SPEEDLIMIT_GLOBAL:
2546            {
2547                int       i;
2548                const int n = tr_ptrArraySize( t->peers );
2549                for( i = 0; i < n; ++i )
2550                    tr_ptrArrayAppend( globalPool,
2551                                      tr_ptrArrayNth( t->peers, i ) );
2552                poolBytesUsed += used;
2553                break;
2554            }
2555        }
2556    }
2557
2558    /* add incoming handshakes to the global pool */
2559    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2560    allBytesUsed += i;
2561    poolBytesUsed += i;
2562
2563    tr_rcTransferred( mgr->globalPoolRawSpeed[direction], poolBytesUsed );
2564
2565    /* handle the global pool's connections */
2566    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2567        givePeersUnlimitedBandwidth( globalPool, direction );
2568    else
2569        setPeerBandwidth( globalPool, direction,
2570                          mgr->globalPoolRawSpeed[direction],
2571                          tr_sessionGetSpeedLimit( session, direction ) );
2572
2573    /* now that we've allocated bandwidth, pump all the connected peers */
2574    pumpAllPeers( mgr );
2575
2576    /* cleanup */
2577    tr_ptrArrayFree( globalPool, NULL );
2578    return allBytesUsed;
2579}
2580
2581static int
2582bandwidthPulse( void * vmgr )
2583{
2584    tr_peerMgr * mgr = vmgr;
2585    int          i;
2586
2587    managerLock( mgr );
2588
2589    /* allocate the upload and download bandwidth */
2590    for( i = 0; i < 2; ++i )
2591        allocateBandwidth( mgr, i );
2592
2593    managerUnlock( mgr );
2594    return TRUE;
2595}
2596
Note: See TracBrowser for help on using the repository browser.