source: trunk/libtransmission/peer-mgr.c @ 7125

Last change on this file since 7125 was 7125, checked in by charles, 12 years ago

(libT) better possible fix for #1468: Speed display is very jumpy

  • Property svn:keywords set to Date Rev Author Id
File size: 75.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7125 2008-11-17 04:00:57Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57   
58    /* how frequently to reallocate bandwidth */
59    BANDWIDTH_PERIOD_MSEC = 250,
60
61    /* max # of peers to ask fer per torrent per reconnect pulse */
62    MAX_RECONNECTIONS_PER_PULSE = 2,
63
64    /* max number of peers to ask for per second overall.
65    * this throttle is to avoid overloading the router */
66    MAX_CONNECTIONS_PER_SECOND = 4,
67
68    /* number of unchoked peers per torrent.
69     * FIXME: this probably ought to be configurable */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.
79     * if they try to connect to us it's okay */
80    MYFLAG_UNREACHABLE = 2,
81
82    /* the minimum we'll wait before attempting to reconnect to a peer */
83    MINIMUM_RECONNECT_INTERVAL_SECS = 5
84};
85
86
87/**
88***
89**/
90
91/* We keep one of these for every peer we know about, whether
92 * it's connected or not, so the struct must be small.
93 * When our current connections underperform, we dip back
94 * into this list for new ones. */
95struct peer_atom
96{
97    uint8_t           from;
98    uint8_t           flags; /* these match the added_f flags */
99    uint8_t           myflags; /* flags that aren't defined in added_f */
100    uint16_t          port;
101    uint16_t          numFails;
102    struct in_addr    addr;
103    time_t            time; /* when the peer's connection status last changed */
104    time_t            piece_data_time;
105};
106
107typedef struct
108{
109    unsigned int    isRunning : 1;
110
111    uint8_t         hash[SHA_DIGEST_LENGTH];
112    int         *   pendingRequestCount;
113    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
114    tr_ptrArray *   pool; /* struct peer_atom */
115    tr_ptrArray *   peers; /* tr_peer */
116    tr_ptrArray *   webseeds; /* tr_webseed */
117    tr_timer *      reconnectTimer;
118    tr_timer *      rechokeTimer;
119    tr_timer *      refillTimer;
120    tr_torrent *    tor;
121    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
122
123    struct tr_peerMgr * manager;
124}
125Torrent;
126
127struct tr_peerMgr
128{
129    tr_session      * session;
130    tr_ptrArray     * torrents; /* Torrent */
131    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
132    tr_timer        * bandwidthTimer;
133    tr_ratecontrol  * globalPoolRawSpeed[2];
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187compareAddresses( const struct in_addr * a,
188                  const struct in_addr * b )
189{
190    if( a->s_addr != b->s_addr )
191        return a->s_addr < b->s_addr ? -1 : 1;
192
193    return 0;
194}
195
196static int
197handshakeCompareToAddr( const void * va,
198                        const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a,
207                  const void * b )
208{
209    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
210}
211
212static tr_handshake*
213getExistingHandshake( tr_ptrArray *          handshakes,
214                      const struct in_addr * in_addr )
215{
216    return tr_ptrArrayFindSorted( handshakes,
217                                  in_addr,
218                                  handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va,
223                          const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va,
232                  const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va,
274             const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return compareAddresses( &a->in_addr, &b->in_addr );
280}
281
282static int
283peerCompareToAddr( const void * va,
284                   const void * vb )
285{
286    const tr_peer * a = va;
287
288    return compareAddresses( &a->in_addr, vb );
289}
290
291static tr_peer*
292getExistingPeer( Torrent *              torrent,
293                 const struct in_addr * in_addr )
294{
295    assert( torrentIsLocked( torrent ) );
296    assert( in_addr );
297
298    return tr_ptrArrayFindSorted( torrent->peers,
299                                  in_addr,
300                                  peerCompareToAddr );
301}
302
303static struct peer_atom*
304getExistingAtom( const                  Torrent * t,
305                 const struct in_addr * addr )
306{
307    assert( torrentIsLocked( t ) );
308    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
309}
310
311static int
312peerIsInUse( const Torrent *        ct,
313             const struct in_addr * addr )
314{
315    Torrent * t = (Torrent*) ct;
316
317    assert( torrentIsLocked ( t ) );
318
319    return getExistingPeer( t, addr )
320           || getExistingHandshake( t->outgoingHandshakes, addr )
321           || getExistingHandshake( t->manager->incomingHandshakes, addr );
322}
323
324static tr_peer*
325peerConstructor( const struct in_addr * in_addr )
326{
327    tr_peer * p;
328
329    p = tr_new0( tr_peer, 1 );
330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
331    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
332    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
333    p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
334    p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
335    return p;
336}
337
338static tr_peer*
339getPeer( Torrent *              torrent,
340         const struct in_addr * in_addr )
341{
342    tr_peer * peer;
343
344    assert( torrentIsLocked( torrent ) );
345
346    peer = getExistingPeer( torrent, in_addr );
347
348    if( peer == NULL )
349    {
350        peer = peerConstructor( in_addr );
351        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
352    }
353
354    return peer;
355}
356
357static void
358peerDestructor( tr_peer * peer )
359{
360    assert( peer );
361    assert( peer->msgs );
362
363    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
364    tr_peerMsgsFree( peer->msgs );
365
366    tr_peerIoFree( peer->io );
367
368    tr_bitfieldFree( peer->have );
369    tr_bitfieldFree( peer->blame );
370    tr_free( peer->client );
371
372    tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
373    tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
374    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
375    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
376    tr_free( peer );
377}
378
379static void
380removePeer( Torrent * t,
381            tr_peer * peer )
382{
383    tr_peer *          removed;
384    struct peer_atom * atom;
385
386    assert( torrentIsLocked( t ) );
387
388    atom = getExistingAtom( t, &peer->in_addr );
389    assert( atom );
390    atom->time = time( NULL );
391
392    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
393    assert( removed == peer );
394    peerDestructor( removed );
395}
396
397static void
398removeAllPeers( Torrent * t )
399{
400    while( !tr_ptrArrayEmpty( t->peers ) )
401        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
402}
403
404static void
405torrentDestructor( void * vt )
406{
407    Torrent * t = vt;
408    uint8_t   hash[SHA_DIGEST_LENGTH];
409
410    assert( t );
411    assert( !t->isRunning );
412    assert( t->peers );
413    assert( torrentIsLocked( t ) );
414    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
415    assert( tr_ptrArrayEmpty( t->peers ) );
416
417    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
418
419    tr_timerFree( &t->reconnectTimer );
420    tr_timerFree( &t->rechokeTimer );
421    tr_timerFree( &t->refillTimer );
422
423    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
424    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
425    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
426    tr_ptrArrayFree( t->peers, NULL );
427
428    tr_free( t->pendingRequestCount );
429    tr_free( t );
430}
431
432static void peerCallbackFunc( void * vpeer,
433                              void * vevent,
434                              void * vt );
435
436static Torrent*
437torrentConstructor( tr_peerMgr * manager,
438                    tr_torrent * tor )
439{
440    int       i;
441    Torrent * t;
442
443    t = tr_new0( Torrent, 1 );
444    t->manager = manager;
445    t->tor = tor;
446    t->pool = tr_ptrArrayNew( );
447    t->peers = tr_ptrArrayNew( );
448    t->webseeds = tr_ptrArrayNew( );
449    t->outgoingHandshakes = tr_ptrArrayNew( );
450    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
451
452    for( i = 0; i < tor->info.webseedCount; ++i )
453    {
454        tr_webseed * w =
455            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
456                           t );
457        tr_ptrArrayAppend( t->webseeds, w );
458    }
459
460    return t;
461}
462
463/**
464 * For explanation, see http://www.bittorrent.org/fast_extensions.html
465 * Also see the "test-allowed-set" unit test
466 *
467 * @param k number of pieces in set
468 * @param sz number of pieces in the torrent
469 * @param infohash torrent's SHA1 hash
470 * @param ip peer's address
471 */
472struct tr_bitfield *
473tr_peerMgrGenerateAllowedSet(
474    const uint32_t         k,
475    const uint32_t         sz,
476    const                  uint8_t        *
477                           infohash,
478    const struct in_addr * ip )
479{
480    uint8_t       w[SHA_DIGEST_LENGTH + 4];
481    uint8_t       x[SHA_DIGEST_LENGTH];
482    tr_bitfield * a;
483    uint32_t      a_size;
484
485    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
486    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
487    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
488
489    a = tr_bitfieldNew( sz );
490    a_size = 0;
491
492    while( a_size < k )
493    {
494        int i;
495        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
496        {
497            uint32_t j = i * 4;                                /* (5) */
498            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
499            uint32_t index = y % sz;                           /* (7) */
500            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
501            {
502                tr_bitfieldAdd( a, index );                    /* (9) */
503                ++a_size;
504            }
505        }
506        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
507    }
508
509    return a;
510}
511
512static int bandwidthPulse( void * vmgr );
513
514
515tr_peerMgr*
516tr_peerMgrNew( tr_session * session )
517{
518    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
519
520    m->session = session;
521    m->torrents = tr_ptrArrayNew( );
522    m->incomingHandshakes = tr_ptrArrayNew( );
523    m->globalPoolRawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
524    m->globalPoolRawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
525    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
526    return m;
527}
528
529void
530tr_peerMgrFree( tr_peerMgr * manager )
531{
532    managerLock( manager );
533
534    tr_timerFree( &manager->bandwidthTimer );
535    tr_rcClose( manager->globalPoolRawSpeed[TR_CLIENT_TO_PEER] );
536    tr_rcClose( manager->globalPoolRawSpeed[TR_PEER_TO_CLIENT] );
537
538    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
539     * the item from manager->handshakes, so this is a little roundabout... */
540    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
541        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
542
543    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
544
545    /* free the torrents. */
546    tr_ptrArrayFree( manager->torrents, torrentDestructor );
547
548    managerUnlock( manager );
549    tr_free( manager );
550}
551
552static tr_peer**
553getConnectedPeers( Torrent * t,
554                   int *     setmeCount )
555{
556    int       i, peerCount, connectionCount;
557    tr_peer **peers;
558    tr_peer **ret;
559
560    assert( torrentIsLocked( t ) );
561
562    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
563    ret = tr_new( tr_peer *, peerCount );
564
565    for( i = connectionCount = 0; i < peerCount; ++i )
566        if( peers[i]->msgs )
567            ret[connectionCount++] = peers[i];
568
569    *setmeCount = connectionCount;
570    return ret;
571}
572
573static int
574clientIsDownloadingFrom( const tr_peer * peer )
575{
576    return peer->clientIsInterested && !peer->clientIsChoked;
577}
578
579static int
580clientIsUploadingTo( const tr_peer * peer )
581{
582    return peer->peerIsInterested && !peer->peerIsChoked;
583}
584
585/***
586****
587***/
588
589int
590tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
591                      const uint8_t *        torrentHash,
592                      const struct in_addr * addr )
593{
594    int                      isSeed = FALSE;
595    const Torrent *          t = NULL;
596    const struct peer_atom * atom = NULL;
597
598    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
599    if( t )
600        atom = getExistingAtom( t, addr );
601    if( atom )
602        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
603
604    return isSeed;
605}
606
607/****
608*****
609*****  REFILL
610*****
611****/
612
613static void
614assertValidPiece( Torrent * t, tr_piece_index_t piece )
615{
616    assert( t );
617    assert( t->tor );
618    assert( piece < t->tor->info.pieceCount );
619}
620
621static int
622getPieceRequests( Torrent * t, tr_piece_index_t piece )
623{
624    assertValidPiece( t, piece );
625
626    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
627}
628
629static void
630incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
631{
632    assertValidPiece( t, piece );
633
634    if( t->pendingRequestCount == NULL )
635        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
636    t->pendingRequestCount[piece]++;
637}
638
639static void
640decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
641{
642    assertValidPiece( t, piece );
643
644    if( t->pendingRequestCount )
645        t->pendingRequestCount[piece]--;
646}
647
648struct tr_refill_piece
649{
650    tr_priority_t    priority;
651    uint32_t         piece;
652    uint32_t         peerCount;
653    int              random;
654    int              pendingRequestCount;
655    int              missingBlockCount;
656};
657
658static int
659compareRefillPiece( const void * aIn,
660                    const void * bIn )
661{
662    const struct tr_refill_piece * a = aIn;
663    const struct tr_refill_piece * b = bIn;
664
665    /* if one piece has a higher priority, it goes first */
666    if( a->priority != b->priority )
667        return a->priority > b->priority ? -1 : 1;
668
669    /* have a per-priority endgame */
670    if( a->pendingRequestCount != b->pendingRequestCount )
671        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
672
673    /* fewer missing pieces goes first */
674    if( a->missingBlockCount != b->missingBlockCount )
675        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
676
677    /* otherwise if one has fewer peers, it goes first */
678    if( a->peerCount != b->peerCount )
679        return a->peerCount < b->peerCount ? -1 : 1;
680
681    /* otherwise go with our random seed */
682    if( a->random != b->random )
683        return a->random < b->random ? -1 : 1;
684
685    return 0;
686}
687
688static tr_piece_index_t *
689getPreferredPieces( Torrent           * t,
690                    tr_piece_index_t  * pieceCount )
691{
692    const tr_torrent  * tor = t->tor;
693    const tr_info     * inf = &tor->info;
694    tr_piece_index_t    i;
695    tr_piece_index_t    poolSize = 0;
696    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
697    int                 peerCount;
698    tr_peer**           peers;
699
700    assert( torrentIsLocked( t ) );
701
702    peers = getConnectedPeers( t, &peerCount );
703
704    /* make a list of the pieces that we want but don't have */
705    for( i = 0; i < inf->pieceCount; ++i )
706        if( !tor->info.pieces[i].dnd
707                && !tr_cpPieceIsComplete( tor->completion, i ) )
708            pool[poolSize++] = i;
709
710    /* sort the pool by which to request next */
711    if( poolSize > 1 )
712    {
713        tr_piece_index_t j;
714        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
715
716        for( j = 0; j < poolSize; ++j )
717        {
718            int k;
719            const tr_piece_index_t piece = pool[j];
720            struct tr_refill_piece * setme = p + j;
721
722            setme->piece = piece;
723            setme->priority = inf->pieces[piece].priority;
724            setme->peerCount = 0;
725            setme->random = tr_cryptoWeakRandInt( INT_MAX );
726            setme->pendingRequestCount = getPieceRequests( t, piece );
727            setme->missingBlockCount
728                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
729
730            for( k = 0; k < peerCount; ++k )
731            {
732                const tr_peer * peer = peers[k];
733                if( peer->peerIsInterested
734                        && !peer->clientIsChoked
735                        && tr_bitfieldHas( peer->have, piece ) )
736                    ++setme->peerCount;
737            }
738        }
739
740        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
741               compareRefillPiece );
742
743        for( j = 0; j < poolSize; ++j )
744            pool[j] = p[j].piece;
745
746        tr_free( p );
747    }
748
749    tr_free( peers );
750
751    *pieceCount = poolSize;
752    return pool;
753}
754
755struct tr_blockIterator
756{
757    Torrent * t;
758    tr_block_index_t blockIndex, blockCount, *blocks;
759    tr_piece_index_t pieceIndex, pieceCount, *pieces;
760};
761
762static struct tr_blockIterator*
763blockIteratorNew( Torrent * t )
764{
765    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
766    i->t = t;
767    i->pieces = getPreferredPieces( t, &i->pieceCount );
768    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
769    return i;
770}
771
772static int
773blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
774{
775    int found;
776    Torrent * t = i->t;
777    tr_torrent * tor = t->tor;
778
779    while( ( i->blockIndex == i->blockCount )
780        && ( i->pieceIndex < i->pieceCount ) )
781    {
782        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
783        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
784        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
785        tr_block_index_t block;
786
787        assert( index < tor->info.pieceCount );
788
789        i->blockCount = 0;
790        i->blockIndex = 0;
791        for( block=b; block!=e; ++block )
792            if( !tr_cpBlockIsComplete( tor->completion, block ) )
793                i->blocks[i->blockCount++] = block;
794    }
795
796    if(( found = ( i->blockIndex < i->blockCount )))
797        *setme = i->blocks[i->blockIndex++];
798
799    return found;
800}
801
802static void
803blockIteratorFree( struct tr_blockIterator * i )
804{
805    tr_free( i->blocks );
806    tr_free( i->pieces );
807    tr_free( i );
808}
809
810static tr_peer**
811getPeersUploadingToClient( Torrent * t,
812                           int *     setmeCount )
813{
814    int j;
815    int peerCount = 0;
816    int retCount = 0;
817    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
818    tr_peer ** ret = tr_new( tr_peer *, peerCount );
819
820    j = 0; /* this is a temporary test to make sure we walk through all the peers */
821    if( peerCount )
822    {
823        /* Get a list of peers we're downloading from.
824           Pick a different starting point each time so all peers
825           get a chance at being the first in line */
826        const int fencepost = tr_cryptoWeakRandInt( peerCount );
827        int i = fencepost;
828        do {
829            if( clientIsDownloadingFrom( peers[i] ) )
830                ret[retCount++] = peers[i];
831            i = ( i + 1 ) % peerCount;
832            ++j;
833        } while( i != fencepost );
834    }
835    assert( j == peerCount );
836    *setmeCount = retCount;
837    return ret;
838}
839
840static uint32_t
841getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
842{
843    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
844    const uint64_t blockPos = tor->blockSize * b;
845    assert( blockPos >= piecePos );
846    return (uint32_t)( blockPos - piecePos );
847}
848
849static int
850refillPulse( void * vtorrent )
851{
852    tr_block_index_t block;
853    int peerCount;
854    int webseedCount;
855    tr_peer ** peers;
856    tr_webseed ** webseeds;
857    struct tr_blockIterator * blockIterator;
858    Torrent * t = vtorrent;
859    tr_torrent * tor = t->tor;
860
861    if( !t->isRunning )
862        return TRUE;
863    if( tr_torrentIsSeed( t->tor ) )
864        return TRUE;
865
866    torrentLock( t );
867    tordbg( t, "Refilling Request Buffers..." );
868
869    blockIterator = blockIteratorNew( t );
870    peers = getPeersUploadingToClient( t, &peerCount );
871    webseedCount = tr_ptrArraySize( t->webseeds );
872    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
873                          webseedCount * sizeof( tr_webseed* ) );
874
875    while( ( webseedCount || peerCount )
876        && blockIteratorNext( blockIterator, &block ) )
877    {
878        int j;
879        int handled = FALSE;
880
881        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
882        const uint32_t offset = getBlockOffsetInPiece( tor, block );
883        const uint32_t length = tr_torBlockCountBytes( tor, block );
884
885        /* find a peer who can ask for this block */
886        for( j=0; !handled && j<peerCount; )
887        {
888            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
889                                                   index, offset, length );
890            switch( val )
891            {
892                case TR_ADDREQ_FULL:
893                case TR_ADDREQ_CLIENT_CHOKED:
894                    peers[j] = peers[--peerCount];
895                    break;
896
897                case TR_ADDREQ_MISSING:
898                case TR_ADDREQ_DUPLICATE:
899                    ++j;
900                    break;
901
902                case TR_ADDREQ_OK:
903                    incrementPieceRequests( t, index );
904                    handled = TRUE;
905                    break;
906
907                default:
908                    assert( 0 && "unhandled value" );
909                    break;
910            }
911        }
912
913        /* maybe one of the webseeds can do it */
914        for( j=0; !handled && j<webseedCount; )
915        {
916            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
917                                                          index, offset, length );
918            switch( val )
919            {
920                case TR_ADDREQ_FULL:
921                    webseeds[j] = webseeds[--webseedCount];
922                    break;
923
924                case TR_ADDREQ_OK:
925                    incrementPieceRequests( t, index );
926                    handled = TRUE;
927                    break;
928
929                default:
930                    assert( 0 && "unhandled value" );
931                    break;
932            }
933        }
934    }
935
936    /* cleanup */
937    blockIteratorFree( blockIterator );
938    tr_free( webseeds );
939    tr_free( peers );
940
941    t->refillTimer = NULL;
942    torrentUnlock( t );
943    return FALSE;
944}
945
946static void
947broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
948{
949    int i, size;
950    tr_peer ** peers;
951
952    assert( torrentIsLocked( t ) );
953
954    peers = getConnectedPeers( t, &size );
955    for( i=0; i<size; ++i )
956        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
957    tr_free( peers );
958}
959
960static void
961addStrike( Torrent * t,
962           tr_peer * peer )
963{
964    tordbg( t, "increasing peer %s strike count to %d",
965            tr_peerIoAddrStr( &peer->in_addr,
966                              peer->port ), peer->strikes + 1 );
967
968    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
969    {
970        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
971        atom->myflags |= MYFLAG_BANNED;
972        peer->doPurge = 1;
973        tordbg( t, "banning peer %s",
974               tr_peerIoAddrStr( &atom->addr, atom->port ) );
975    }
976}
977
978static void
979gotBadPiece( Torrent *        t,
980             tr_piece_index_t pieceIndex )
981{
982    tr_torrent *   tor = t->tor;
983    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
984
985    tor->corruptCur += byteCount;
986    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
987}
988
989static void
990refillSoon( Torrent * t )
991{
992    if( t->refillTimer == NULL )
993        t->refillTimer = tr_timerNew( t->manager->session,
994                                      refillPulse, t,
995                                      REFILL_PERIOD_MSEC );
996}
997
998static void
999peerCallbackFunc( void * vpeer,
1000                  void * vevent,
1001                  void * vt )
1002{
1003    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
1004    Torrent *             t = (Torrent *) vt;
1005    const tr_peer_event * e = vevent;
1006
1007    torrentLock( t );
1008
1009    switch( e->eventType )
1010    {
1011        case TR_PEER_NEED_REQ:
1012            refillSoon( t );
1013            break;
1014
1015        case TR_PEER_CANCEL:
1016            decrementPieceRequests( t, e->pieceIndex );
1017            break;
1018
1019        case TR_PEER_PEER_GOT_DATA:
1020        {
1021            const time_t now = time( NULL );
1022            tr_torrent * tor = t->tor;
1023            const tr_direction dir = TR_CLIENT_TO_PEER;
1024
1025            tor->activityDate = now;
1026
1027            if( e->wasPieceData )
1028                tor->uploadedCur += e->length;
1029
1030            /* add it to the raw upload speed */
1031            if( peer )
1032                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1033            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
1034            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
1035
1036            /* maybe add it to the piece upload speed */
1037            if( e->wasPieceData ) {
1038                if( peer )
1039                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1040                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
1041                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
1042            }
1043
1044            /* update the stats */
1045            if( e->wasPieceData )
1046                tr_statsAddUploaded( tor->session, e->length );
1047
1048            /* update our atom */
1049            if( peer ) {
1050                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1051                a->piece_data_time = now;
1052            }
1053
1054            break;
1055        }
1056
1057        case TR_PEER_CLIENT_GOT_DATA:
1058        {
1059            const time_t now = time( NULL );
1060            tr_torrent * tor = t->tor;
1061            const tr_direction dir = TR_PEER_TO_CLIENT;
1062
1063            tor->activityDate = now;
1064
1065            /* only add this to downloadedCur if we got it from a peer --
1066             * webseeds shouldn't count against our ratio.  As one tracker
1067             * admin put it, "Those pieces are downloaded directly from the
1068             * content distributor, not the peers, it is the tracker's job
1069             * to manage the swarms, not the web server and does not fit
1070             * into the jurisdiction of the tracker." */
1071            if( peer && e->wasPieceData )
1072                tor->downloadedCur += e->length;
1073
1074            /* add it to our raw download speed */
1075            if( peer )
1076                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1077            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
1078            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
1079
1080            /* maybe add it to the piece upload speed */
1081            if( e->wasPieceData ) {
1082                if( peer )
1083                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1084                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
1085                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
1086            }
1087     
1088            /* update the stats */ 
1089            if( e->wasPieceData )
1090                tr_statsAddDownloaded( tor->session, e->length );
1091
1092            /* update our atom */
1093            if( peer ) {
1094                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1095                a->piece_data_time = now;
1096            }
1097
1098            break;
1099        }
1100
1101        case TR_PEER_PEER_PROGRESS:
1102        {
1103            if( peer )
1104            {
1105                struct peer_atom * atom = getExistingAtom( t,
1106                                                           &peer->in_addr );
1107                const int          peerIsSeed = e->progress >= 1.0;
1108                if( peerIsSeed )
1109                {
1110                    tordbg( t, "marking peer %s as a seed",
1111                           tr_peerIoAddrStr( &atom->addr,
1112                                             atom->port ) );
1113                    atom->flags |= ADDED_F_SEED_FLAG;
1114                }
1115                else
1116                {
1117                    tordbg( t, "marking peer %s as a non-seed",
1118                           tr_peerIoAddrStr( &atom->addr,
1119                                             atom->port ) );
1120                    atom->flags &= ~ADDED_F_SEED_FLAG;
1121                }
1122            }
1123            break;
1124        }
1125
1126        case TR_PEER_CLIENT_GOT_BLOCK:
1127        {
1128            tr_torrent *     tor = t->tor;
1129
1130            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1131                                                e->offset );
1132
1133            tr_cpBlockAdd( tor->completion, block );
1134            decrementPieceRequests( t, e->pieceIndex );
1135
1136            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1137
1138            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1139            {
1140                const tr_piece_index_t p = e->pieceIndex;
1141                const int              ok = tr_ioTestPiece( tor, p );
1142
1143                if( !ok )
1144                {
1145                    tr_torerr( tor,
1146                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1147                              (unsigned long)p );
1148                }
1149
1150                tr_torrentSetHasPiece( tor, p, ok );
1151                tr_torrentSetPieceChecked( tor, p, TRUE );
1152                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1153
1154                if( !ok )
1155                    gotBadPiece( t, p );
1156                else
1157                {
1158                    int        i, peerCount;
1159                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1160                    for( i = 0; i < peerCount; ++i )
1161                        tr_peerMsgsHave( peers[i]->msgs, p );
1162                    tr_free( peers );
1163                }
1164
1165                tr_torrentRecheckCompleteness( tor );
1166            }
1167            break;
1168        }
1169
1170        case TR_PEER_ERROR:
1171            if( e->err == EINVAL )
1172            {
1173                addStrike( t, peer );
1174                peer->doPurge = 1;
1175            }
1176            else if( ( e->err == ERANGE )
1177                  || ( e->err == EMSGSIZE )
1178                  || ( e->err == ENOTCONN ) )
1179            {
1180                /* some protocol error from the peer */
1181                peer->doPurge = 1;
1182            }
1183            else /* a local error, such as an IO error */
1184            {
1185                t->tor->error = e->err;
1186                tr_strlcpy( t->tor->errorString,
1187                            tr_strerror( t->tor->error ),
1188                            sizeof( t->tor->errorString ) );
1189                tr_torrentStop( t->tor );
1190            }
1191            break;
1192
1193        default:
1194            assert( 0 );
1195    }
1196
1197    torrentUnlock( t );
1198}
1199
1200static void
1201ensureAtomExists( Torrent *              t,
1202                  const struct in_addr * addr,
1203                  uint16_t               port,
1204                  uint8_t                flags,
1205                  uint8_t                from )
1206{
1207    if( getExistingAtom( t, addr ) == NULL )
1208    {
1209        struct peer_atom * a;
1210        a = tr_new0( struct peer_atom, 1 );
1211        a->addr = *addr;
1212        a->port = port;
1213        a->flags = flags;
1214        a->from = from;
1215        tordbg( t, "got a new atom: %s",
1216               tr_peerIoAddrStr( &a->addr, a->port ) );
1217        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1218    }
1219}
1220
1221static int
1222getMaxPeerCount( const tr_torrent * tor )
1223{
1224    return tor->maxConnectedPeers;
1225}
1226
1227static int
1228getPeerCount( const Torrent * t )
1229{
1230    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1231               t->outgoingHandshakes );
1232}
1233
1234/* FIXME: this is kind of a mess. */
1235static int
1236myHandshakeDoneCB( tr_handshake *  handshake,
1237                   tr_peerIo *     io,
1238                   int             isConnected,
1239                   const uint8_t * peer_id,
1240                   void *          vmanager )
1241{
1242    int                    ok = isConnected;
1243    int                    success = FALSE;
1244    uint16_t               port;
1245    const struct in_addr * addr;
1246    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1247    Torrent *              t;
1248    tr_handshake *         ours;
1249
1250    assert( io );
1251    assert( isConnected == 0 || isConnected == 1 );
1252
1253    t = tr_peerIoHasTorrentHash( io )
1254        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1255        : NULL;
1256
1257    if( tr_peerIoIsIncoming ( io ) )
1258        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1259                                        handshake, handshakeCompare );
1260    else if( t )
1261        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1262                                        handshake, handshakeCompare );
1263    else
1264        ours = handshake;
1265
1266    assert( ours );
1267    assert( ours == handshake );
1268
1269    if( t )
1270        torrentLock( t );
1271
1272    addr = tr_peerIoGetAddress( io, &port );
1273
1274    if( !ok || !t || !t->isRunning )
1275    {
1276        if( t )
1277        {
1278            struct peer_atom * atom = getExistingAtom( t, addr );
1279            if( atom )
1280                ++atom->numFails;
1281        }
1282
1283        tr_peerIoFree( io );
1284    }
1285    else /* looking good */
1286    {
1287        struct peer_atom * atom;
1288        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1289        atom = getExistingAtom( t, addr );
1290        atom->time = time( NULL );
1291
1292        if( atom->myflags & MYFLAG_BANNED )
1293        {
1294            tordbg( t, "banned peer %s tried to reconnect",
1295                   tr_peerIoAddrStr( &atom->addr,
1296                                     atom->port ) );
1297            tr_peerIoFree( io );
1298        }
1299        else if( tr_peerIoIsIncoming( io )
1300               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1301
1302        {
1303            tr_peerIoFree( io );
1304        }
1305        else
1306        {
1307            tr_peer * peer = getExistingPeer( t, addr );
1308
1309            if( peer ) /* we already have this peer */
1310            {
1311                tr_peerIoFree( io );
1312            }
1313            else
1314            {
1315                peer = getPeer( t, addr );
1316                tr_free( peer->client );
1317
1318                if( !peer_id )
1319                    peer->client = NULL;
1320                else
1321                {
1322                    char client[128];
1323                    tr_clientForId( client, sizeof( client ), peer_id );
1324                    peer->client = tr_strdup( client );
1325                }
1326                peer->port = port;
1327                peer->io = io;
1328                peer->msgs =
1329                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1330                                    &peer->msgsTag );
1331
1332                success = TRUE;
1333            }
1334        }
1335    }
1336
1337    if( t )
1338        torrentUnlock( t );
1339
1340    return success;
1341}
1342
1343void
1344tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1345                       struct in_addr * addr,
1346                       uint16_t         port,
1347                       int              socket )
1348{
1349    managerLock( manager );
1350
1351    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1352    {
1353        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1354               inet_ntoa( *addr ) );
1355        tr_netClose( socket );
1356    }
1357    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1358    {
1359        tr_netClose( socket );
1360    }
1361    else /* we don't have a connetion to them yet... */
1362    {
1363        tr_peerIo *    io;
1364        tr_handshake * handshake;
1365
1366        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1367
1368        handshake = tr_handshakeNew( io,
1369                                     manager->session->encryptionMode,
1370                                     myHandshakeDoneCB,
1371                                     manager );
1372
1373        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1374                                 handshakeCompare );
1375    }
1376
1377    managerUnlock( manager );
1378}
1379
1380void
1381tr_peerMgrAddPex( tr_peerMgr *    manager,
1382                  const uint8_t * torrentHash,
1383                  uint8_t         from,
1384                  const tr_pex *  pex )
1385{
1386    Torrent * t;
1387
1388    managerLock( manager );
1389
1390    t = getExistingTorrent( manager, torrentHash );
1391    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1392        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1393
1394    managerUnlock( manager );
1395}
1396
1397tr_pex *
1398tr_peerMgrCompactToPex( const void *    compact,
1399                        size_t          compactLen,
1400                        const uint8_t * added_f,
1401                        size_t          added_f_len,
1402                        size_t *        pexCount )
1403{
1404    size_t          i;
1405    size_t          n = compactLen / 6;
1406    const uint8_t * walk = compact;
1407    tr_pex *        pex = tr_new0( tr_pex, n );
1408
1409    for( i = 0; i < n; ++i )
1410    {
1411        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1412        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1413        if( added_f && ( n == added_f_len ) )
1414            pex[i].flags = added_f[i];
1415    }
1416
1417    *pexCount = n;
1418    return pex;
1419}
1420
1421/**
1422***
1423**/
1424
1425void
1426tr_peerMgrSetBlame( tr_peerMgr *     manager,
1427                    const uint8_t *  torrentHash,
1428                    tr_piece_index_t pieceIndex,
1429                    int              success )
1430{
1431    if( !success )
1432    {
1433        int        peerCount, i;
1434        Torrent *  t = getExistingTorrent( manager, torrentHash );
1435        tr_peer ** peers;
1436
1437        assert( torrentIsLocked( t ) );
1438
1439        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1440        for( i = 0; i < peerCount; ++i )
1441        {
1442            tr_peer * peer = peers[i];
1443            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1444            {
1445                tordbg(
1446                    t,
1447                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1448                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1449                    pieceIndex, (int)peer->strikes + 1 );
1450                addStrike( t, peer );
1451            }
1452        }
1453    }
1454}
1455
1456int
1457tr_pexCompare( const void * va,
1458               const void * vb )
1459{
1460    const tr_pex * a = va;
1461    const tr_pex * b = vb;
1462    int            i =
1463        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1464
1465    if( i ) return i;
1466    if( a->port < b->port ) return -1;
1467    if( a->port > b->port ) return 1;
1468    return 0;
1469}
1470
1471int tr_pexCompare( const void * a,
1472                   const void * b );
1473
1474static int
1475peerPrefersCrypto( const tr_peer * peer )
1476{
1477    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1478        return TRUE;
1479
1480    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1481        return FALSE;
1482
1483    return tr_peerIoIsEncrypted( peer->io );
1484}
1485
1486int
1487tr_peerMgrGetPeers( tr_peerMgr *    manager,
1488                    const uint8_t * torrentHash,
1489                    tr_pex **       setme_pex )
1490{
1491    int peerCount = 0;
1492    const Torrent *  t;
1493
1494    managerLock( manager );
1495
1496    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1497    if( !t )
1498    {
1499        *setme_pex = NULL;
1500    }
1501    else
1502    {
1503        int i;
1504        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1505        tr_pex * pex = tr_new( tr_pex, peerCount );
1506        tr_pex * walk = pex;
1507
1508        for( i = 0; i < peerCount; ++i, ++walk )
1509        {
1510            const tr_peer * peer = peers[i];
1511            walk->in_addr = peer->in_addr;
1512            walk->port = peer->port;
1513            walk->flags = 0;
1514            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1515            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1516        }
1517
1518        assert( ( walk - pex ) == peerCount );
1519        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1520        *setme_pex = pex;
1521    }
1522
1523    managerUnlock( manager );
1524    return peerCount;
1525}
1526
1527static int reconnectPulse( void * vtorrent );
1528
1529static int rechokePulse( void * vtorrent );
1530
1531void
1532tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1533                        const uint8_t * torrentHash )
1534{
1535    Torrent * t;
1536
1537    managerLock( manager );
1538
1539    t = getExistingTorrent( manager, torrentHash );
1540
1541    assert( t );
1542    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1543    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1544
1545    if( !t->isRunning )
1546    {
1547        t->isRunning = 1;
1548
1549        t->reconnectTimer = tr_timerNew( t->manager->session,
1550                                         reconnectPulse, t,
1551                                         RECONNECT_PERIOD_MSEC );
1552
1553        t->rechokeTimer = tr_timerNew( t->manager->session,
1554                                       rechokePulse, t,
1555                                       RECHOKE_PERIOD_MSEC );
1556
1557        reconnectPulse( t );
1558
1559        rechokePulse( t );
1560
1561        if( !tr_ptrArrayEmpty( t->webseeds ) )
1562            refillSoon( t );
1563    }
1564
1565    managerUnlock( manager );
1566}
1567
1568static void
1569stopTorrent( Torrent * t )
1570{
1571    assert( torrentIsLocked( t ) );
1572
1573    t->isRunning = 0;
1574    tr_timerFree( &t->rechokeTimer );
1575    tr_timerFree( &t->reconnectTimer );
1576
1577    /* disconnect the peers. */
1578    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1579    tr_ptrArrayClear( t->peers );
1580
1581    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1582     * which removes the handshake from t->outgoingHandshakes... */
1583    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1584        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1585}
1586
1587void
1588tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1589                       const uint8_t * torrentHash )
1590{
1591    managerLock( manager );
1592
1593    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1594
1595    managerUnlock( manager );
1596}
1597
1598void
1599tr_peerMgrAddTorrent( tr_peerMgr * manager,
1600                      tr_torrent * tor )
1601{
1602    Torrent * t;
1603
1604    managerLock( manager );
1605
1606    assert( tor );
1607    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1608
1609    t = torrentConstructor( manager, tor );
1610    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1611
1612    managerUnlock( manager );
1613}
1614
1615void
1616tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1617                         const uint8_t * torrentHash )
1618{
1619    Torrent * t;
1620
1621    managerLock( manager );
1622
1623    t = getExistingTorrent( manager, torrentHash );
1624    assert( t );
1625    stopTorrent( t );
1626    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1627    torrentDestructor( t );
1628
1629    managerUnlock( manager );
1630}
1631
1632void
1633tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1634                               const uint8_t *    torrentHash,
1635                               int8_t *           tab,
1636                               unsigned int       tabCount )
1637{
1638    tr_piece_index_t   i;
1639    const Torrent *    t;
1640    const tr_torrent * tor;
1641    float              interval;
1642    int                isComplete;
1643    int                peerCount;
1644    const tr_peer **   peers;
1645
1646    managerLock( manager );
1647
1648    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1649    tor = t->tor;
1650    interval = tor->info.pieceCount / (float)tabCount;
1651    isComplete = tor
1652                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1653    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1654
1655    memset( tab, 0, tabCount );
1656
1657    for( i = 0; tor && i < tabCount; ++i )
1658    {
1659        const int piece = i * interval;
1660
1661        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1662            tab[i] = -1;
1663        else if( peerCount )
1664        {
1665            int j;
1666            for( j = 0; j < peerCount; ++j )
1667                if( tr_bitfieldHas( peers[j]->have, i ) )
1668                    ++tab[i];
1669        }
1670    }
1671
1672    managerUnlock( manager );
1673}
1674
1675/* Returns the pieces that are available from peers */
1676tr_bitfield*
1677tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1678                        const uint8_t *    torrentHash )
1679{
1680    int           i, size;
1681    Torrent *     t;
1682    tr_peer **    peers;
1683    tr_bitfield * pieces;
1684
1685    managerLock( manager );
1686
1687    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1688    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1689    peers = getConnectedPeers( t, &size );
1690    for( i = 0; i < size; ++i )
1691        tr_bitfieldOr( pieces, peers[i]->have );
1692
1693    managerUnlock( manager );
1694    tr_free( peers );
1695    return pieces;
1696}
1697
1698int
1699tr_peerMgrHasConnections( const tr_peerMgr * manager,
1700                          const uint8_t *    torrentHash )
1701{
1702    int             ret;
1703    const Torrent * t;
1704
1705    managerLock( manager );
1706
1707    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1708    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1709               || !tr_ptrArrayEmpty( t->webseeds ) );
1710
1711    managerUnlock( manager );
1712    return ret;
1713}
1714
1715void
1716tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1717                        const uint8_t *    torrentHash,
1718                        int *              setmePeersKnown,
1719                        int *              setmePeersConnected,
1720                        int *              setmeSeedsConnected,
1721                        int *              setmeWebseedsSendingToUs,
1722                        int *              setmePeersSendingToUs,
1723                        int *              setmePeersGettingFromUs,
1724                        int *              setmePeersFrom )
1725{
1726    int                 i, size;
1727    const Torrent *     t;
1728    const tr_peer **    peers;
1729    const tr_webseed ** webseeds;
1730
1731    managerLock( manager );
1732
1733    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1734    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1735
1736    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1737    *setmePeersConnected       = 0;
1738    *setmeSeedsConnected       = 0;
1739    *setmePeersGettingFromUs   = 0;
1740    *setmePeersSendingToUs     = 0;
1741    *setmeWebseedsSendingToUs  = 0;
1742
1743    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1744        setmePeersFrom[i] = 0;
1745
1746    for( i = 0; i < size; ++i )
1747    {
1748        const tr_peer *          peer = peers[i];
1749        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1750
1751        if( peer->io == NULL ) /* not connected */
1752            continue;
1753
1754        ++ * setmePeersConnected;
1755
1756        ++setmePeersFrom[atom->from];
1757
1758        if( clientIsDownloadingFrom( peer ) )
1759            ++ * setmePeersSendingToUs;
1760
1761        if( clientIsUploadingTo( peer ) )
1762            ++ * setmePeersGettingFromUs;
1763
1764        if( atom->flags & ADDED_F_SEED_FLAG )
1765            ++ * setmeSeedsConnected;
1766    }
1767
1768    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1769    for( i = 0; i < size; ++i )
1770    {
1771        if( tr_webseedIsActive( webseeds[i] ) )
1772            ++ * setmeWebseedsSendingToUs;
1773    }
1774
1775    managerUnlock( manager );
1776}
1777
1778float*
1779tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1780                     const uint8_t *    torrentHash )
1781{
1782    const Torrent *     t;
1783    const tr_webseed ** webseeds;
1784    int                 i;
1785    int                 webseedCount;
1786    float *             ret;
1787
1788    assert( manager );
1789    managerLock( manager );
1790
1791    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1792    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1793                                                     &webseedCount );
1794    assert( webseedCount == t->tor->info.webseedCount );
1795    ret = tr_new0( float, webseedCount );
1796
1797    for( i = 0; i < webseedCount; ++i )
1798        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1799            ret[i] = -1.0;
1800
1801    managerUnlock( manager );
1802    return ret;
1803}
1804
1805double
1806tr_peerGetPieceSpeed( const tr_peer    * peer,
1807                      tr_direction       direction )
1808{
1809    assert( peer );
1810    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1811
1812    return tr_rcRate( peer->pieceSpeed[direction] );
1813}
1814
1815
1816struct tr_peer_stat *
1817tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1818                     const   uint8_t     * torrentHash,
1819                     int                 * setmeCount UNUSED )
1820{
1821    int             i, size;
1822    const Torrent * t;
1823    tr_peer **      peers;
1824    tr_peer_stat *  ret;
1825
1826    assert( manager );
1827    managerLock( manager );
1828
1829    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1830    peers = getConnectedPeers( (Torrent*)t, &size );
1831    ret = tr_new0( tr_peer_stat, size );
1832
1833    for( i = 0; i < size; ++i )
1834    {
1835        char *                   pch;
1836        const tr_peer *          peer = peers[i];
1837        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1838        tr_peer_stat *           stat = ret + i;
1839
1840        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1841        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1842                   sizeof( stat->client ) );
1843        stat->port               = peer->port;
1844        stat->from               = atom->from;
1845        stat->progress           = peer->progress;
1846        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1847        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1848        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1849        stat->peerIsChoked       = peer->peerIsChoked;
1850        stat->peerIsInterested   = peer->peerIsInterested;
1851        stat->clientIsChoked     = peer->clientIsChoked;
1852        stat->clientIsInterested = peer->clientIsInterested;
1853        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1854        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1855        stat->isUploadingTo      = clientIsUploadingTo( peer );
1856
1857        pch = stat->flagStr;
1858        if( t->optimistic == peer ) *pch++ = 'O';
1859        if( stat->isDownloadingFrom ) *pch++ = 'D';
1860        else if( stat->clientIsInterested ) *pch++ = 'd';
1861        if( stat->isUploadingTo ) *pch++ = 'U';
1862        else if( stat->peerIsInterested ) *pch++ = 'u';
1863        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1864                'K';
1865        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1866        if( stat->isEncrypted ) *pch++ = 'E';
1867        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1868        if( stat->isIncoming ) *pch++ = 'I';
1869        *pch = '\0';
1870    }
1871
1872    *setmeCount = size;
1873    tr_free( peers );
1874
1875    managerUnlock( manager );
1876    return ret;
1877}
1878
1879/**
1880***
1881**/
1882
1883struct ChokeData
1884{
1885    unsigned int    doUnchoke    : 1;
1886    unsigned int    isInterested : 1;
1887    unsigned int    isChoked     : 1;
1888    int             rate;
1889    tr_peer *       peer;
1890};
1891
1892static int
1893compareChoke( const void * va,
1894              const void * vb )
1895{
1896    const struct ChokeData * a = va;
1897    const struct ChokeData * b = vb;
1898
1899    if( a->rate != b->rate ) /* prefer higher overall speeds */
1900        return a->rate > b->rate ? -1 : 1;
1901
1902    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1903        return a->isChoked ? 1 : -1;
1904
1905    return 0;
1906}
1907
1908static int
1909isNew( const tr_peer * peer )
1910{
1911    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1912}
1913
1914static int
1915isSame( const tr_peer * peer )
1916{
1917    return peer && peer->client && strstr( peer->client, "Transmission" );
1918}
1919
1920/**
1921***
1922**/
1923
1924static void
1925rechoke( Torrent * t )
1926{
1927    int                i, peerCount, size, unchokedInterested;
1928    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1929    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1930    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1931
1932    assert( torrentIsLocked( t ) );
1933
1934    /* sort the peers by preference and rate */
1935    for( i = 0, size = 0; i < peerCount; ++i )
1936    {
1937        tr_peer * peer = peers[i];
1938        if( peer->progress >= 1.0 ) /* choke all seeds */
1939            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1940        else if( chokeAll )
1941            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1942        else {
1943            struct ChokeData * n = &choke[size++];
1944            n->peer         = peer;
1945            n->isInterested = peer->peerIsInterested;
1946            n->isChoked     = peer->peerIsChoked;
1947            n->rate = (int)(tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER )
1948                            + tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT ) );
1949        }
1950    }
1951
1952    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1953
1954    /**
1955     * Reciprocation and number of uploads capping is managed by unchoking
1956     * the N peers which have the best upload rate and are interested.
1957     * This maximizes the client's download rate. These N peers are
1958     * referred to as downloaders, because they are interested in downloading
1959     * from the client.
1960     *
1961     * Peers which have a better upload rate (as compared to the downloaders)
1962     * but aren't interested get unchoked. If they become interested, the
1963     * downloader with the worst upload rate gets choked. If a client has
1964     * a complete file, it uses its upload rate rather than its download
1965     * rate to decide which peers to unchoke.
1966     */
1967    unchokedInterested = 0;
1968    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1969    {
1970        choke[i].doUnchoke = 1;
1971        if( choke[i].isInterested )
1972            ++unchokedInterested;
1973    }
1974
1975    /* optimistic unchoke */
1976    if( i < size )
1977    {
1978        int                n;
1979        struct ChokeData * c;
1980        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1981
1982        for( ; i < size; ++i )
1983        {
1984            if( choke[i].isInterested )
1985            {
1986                const tr_peer * peer = choke[i].peer;
1987                int             x = 1, y;
1988                if( isNew( peer ) ) x *= 3;
1989                if( isSame( peer ) ) x *= 3;
1990                for( y = 0; y < x; ++y )
1991                    tr_ptrArrayAppend( randPool, &choke[i] );
1992            }
1993        }
1994
1995        if( ( n = tr_ptrArraySize( randPool ) ) )
1996        {
1997            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1998            c->doUnchoke = 1;
1999            t->optimistic = c->peer;
2000        }
2001
2002        tr_ptrArrayFree( randPool, NULL );
2003    }
2004
2005    for( i = 0; i < size; ++i )
2006        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2007
2008    /* cleanup */
2009    tr_free( choke );
2010    tr_free( peers );
2011}
2012
2013static int
2014rechokePulse( void * vtorrent )
2015{
2016    Torrent * t = vtorrent;
2017
2018    torrentLock( t );
2019    rechoke( t );
2020    torrentUnlock( t );
2021    return TRUE;
2022}
2023
2024/***
2025****
2026****  Life and Death
2027****
2028***/
2029
2030static int
2031shouldPeerBeClosed( const Torrent * t,
2032                    const tr_peer * peer,
2033                    int             peerCount )
2034{
2035    const tr_torrent *       tor = t->tor;
2036    const time_t             now = time( NULL );
2037    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2038
2039    /* if it's marked for purging, close it */
2040    if( peer->doPurge )
2041    {
2042        tordbg( t, "purging peer %s because its doPurge flag is set",
2043               tr_peerIoAddrStr( &atom->addr,
2044                                 atom->port ) );
2045        return TRUE;
2046    }
2047
2048    /* if we're seeding and the peer has everything we have,
2049     * and enough time has passed for a pex exchange, then disconnect */
2050    if( tr_torrentIsSeed( tor ) )
2051    {
2052        int peerHasEverything;
2053        if( atom->flags & ADDED_F_SEED_FLAG )
2054            peerHasEverything = TRUE;
2055        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2056            peerHasEverything = FALSE;
2057        else
2058        {
2059            tr_bitfield * tmp =
2060                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2061            tr_bitfieldDifference( tmp, peer->have );
2062            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2063            tr_bitfieldFree( tmp );
2064        }
2065        if( peerHasEverything
2066          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2067        {
2068            tordbg( t, "purging peer %s because we're both seeds",
2069                   tr_peerIoAddrStr( &atom->addr,
2070                                     atom->port ) );
2071            return TRUE;
2072        }
2073    }
2074
2075    /* disconnect if it's been too long since piece data has been transferred.
2076     * this is on a sliding scale based on number of available peers... */
2077    {
2078        const int    relaxStrictnessIfFewerThanN =
2079            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2080        /* if we have >= relaxIfFewerThan, strictness is 100%.
2081         * if we have zero connections, strictness is 0% */
2082        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2083                                  ? 1.0
2084                                  : peerCount /
2085                                  (float)relaxStrictnessIfFewerThanN;
2086        const int    lo = MIN_UPLOAD_IDLE_SECS;
2087        const int    hi = MAX_UPLOAD_IDLE_SECS;
2088        const int    limit = lo + ( ( hi - lo ) * strictness );
2089        const time_t then = peer->pieceDataActivityDate;
2090        const int    idleTime = then ? ( now - then ) : 0;
2091        if( idleTime > limit )
2092        {
2093            tordbg(
2094                t,
2095                "purging peer %s because it's been %d secs since we shared anything",
2096                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2097            return TRUE;
2098        }
2099    }
2100
2101    return FALSE;
2102}
2103
2104static tr_peer **
2105getPeersToClose( Torrent * t,
2106                 int *     setmeSize )
2107{
2108    int               i, peerCount, outsize;
2109    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2110                                                           &peerCount );
2111    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2112
2113    assert( torrentIsLocked( t ) );
2114
2115    for( i = outsize = 0; i < peerCount; ++i )
2116        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2117            ret[outsize++] = peers[i];
2118
2119    *setmeSize = outsize;
2120    return ret;
2121}
2122
2123static int
2124compareCandidates( const void * va,
2125                   const void * vb )
2126{
2127    const struct peer_atom * a = *(const struct peer_atom**) va;
2128    const struct peer_atom * b = *(const struct peer_atom**) vb;
2129
2130    /* <Charles> Here we would probably want to try reconnecting to
2131     * peers that had most recently given us data. Lots of users have
2132     * trouble with resets due to their routers and/or ISPs. This way we
2133     * can quickly recover from an unwanted reset. So we sort
2134     * piece_data_time in descending order.
2135     */
2136
2137    if( a->piece_data_time != b->piece_data_time )
2138        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2139
2140    if( a->numFails != b->numFails )
2141        return a->numFails < b->numFails ? -1 : 1;
2142
2143    if( a->time != b->time )
2144        return a->time < b->time ? -1 : 1;
2145
2146    return 0;
2147}
2148
2149static int
2150getReconnectIntervalSecs( const struct peer_atom * atom )
2151{
2152    int          sec;
2153    const time_t now = time( NULL );
2154
2155    /* if we were recently connected to this peer and transferring piece
2156     * data, try to reconnect to them sooner rather that later -- we don't
2157     * want network troubles to get in the way of a good peer. */
2158    if( ( now - atom->piece_data_time ) <=
2159       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2160        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2161
2162    /* don't allow reconnects more often than our minimum */
2163    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2164        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2165
2166    /* otherwise, the interval depends on how many times we've tried
2167     * and failed to connect to the peer */
2168    else switch( atom->numFails )
2169        {
2170            case 0:
2171                sec = 0; break;
2172
2173            case 1:
2174                sec = 5; break;
2175
2176            case 2:
2177                sec = 2 * 60; break;
2178
2179            case 3:
2180                sec = 15 * 60; break;
2181
2182            case 4:
2183                sec = 30 * 60; break;
2184
2185            case 5:
2186                sec = 60 * 60; break;
2187
2188            default:
2189                sec = 120 * 60; break;
2190        }
2191
2192    return sec;
2193}
2194
2195static struct peer_atom **
2196getPeerCandidates(                               Torrent * t,
2197                                           int * setmeSize )
2198{
2199    int                 i, atomCount, retCount;
2200    struct peer_atom ** atoms;
2201    struct peer_atom ** ret;
2202    const time_t        now = time( NULL );
2203    const int           seed = tr_torrentIsSeed( t->tor );
2204
2205    assert( torrentIsLocked( t ) );
2206
2207    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2208    ret = tr_new( struct peer_atom*, atomCount );
2209    for( i = retCount = 0; i < atomCount; ++i )
2210    {
2211        int                interval;
2212        struct peer_atom * atom = atoms[i];
2213
2214        /* peer fed us too much bad data ... we only keep it around
2215         * now to weed it out in case someone sends it to us via pex */
2216        if( atom->myflags & MYFLAG_BANNED )
2217            continue;
2218
2219        /* peer was unconnectable before, so we're not going to keep trying.
2220         * this is needs a separate flag from `banned', since if they try
2221         * to connect to us later, we'll let them in */
2222        if( atom->myflags & MYFLAG_UNREACHABLE )
2223            continue;
2224
2225        /* we don't need two connections to the same peer... */
2226        if( peerIsInUse( t, &atom->addr ) )
2227            continue;
2228
2229        /* no need to connect if we're both seeds... */
2230        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2231            continue;
2232
2233        /* don't reconnect too often */
2234        interval = getReconnectIntervalSecs( atom );
2235        if( ( now - atom->time ) < interval )
2236        {
2237            tordbg(
2238                t,
2239                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2240                i, tr_peerIoAddrStr( &atom->addr,
2241                                     atom->port ), interval );
2242            continue;
2243        }
2244
2245        /* Don't connect to peers in our blocklist */
2246        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2247            continue;
2248
2249        ret[retCount++] = atom;
2250    }
2251
2252    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2253    *setmeSize = retCount;
2254    return ret;
2255}
2256
2257static int
2258reconnectPulse( void * vtorrent )
2259{
2260    Torrent *     t = vtorrent;
2261    static time_t prevTime = 0;
2262    static int    newConnectionsThisSecond = 0;
2263    time_t        now;
2264
2265    torrentLock( t );
2266
2267    now = time( NULL );
2268    if( prevTime != now )
2269    {
2270        prevTime = now;
2271        newConnectionsThisSecond = 0;
2272    }
2273
2274    if( !t->isRunning )
2275    {
2276        removeAllPeers( t );
2277    }
2278    else
2279    {
2280        int                 i, nCandidates, nBad;
2281        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2282        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2283
2284        if( nBad || nCandidates )
2285            tordbg(
2286                t, "reconnect pulse for [%s]: %d bad connections, "
2287                   "%d connection candidates, %d atoms, max per pulse is %d",
2288                t->tor->info.name, nBad, nCandidates,
2289                tr_ptrArraySize( t->pool ),
2290                (int)MAX_RECONNECTIONS_PER_PULSE );
2291
2292        /* disconnect some peers.
2293           if we transferred piece data, then they might be good peers,
2294           so reset their `numFails' weight to zero.  otherwise we connected
2295           to them fruitlessly, so mark it as another fail */
2296        for( i = 0; i < nBad; ++i )
2297        {
2298            tr_peer *          peer = connections[i];
2299            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2300            if( peer->pieceDataActivityDate )
2301                atom->numFails = 0;
2302            else
2303                ++atom->numFails;
2304            tordbg( t, "removing bad peer %s",
2305                   tr_peerIoGetAddrStr( peer->io ) );
2306            removePeer( t, peer );
2307        }
2308
2309        /* add some new ones */
2310        for( i = 0;    ( i < nCandidates )
2311           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2312           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2313           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2314             ++i )
2315        {
2316            tr_peerMgr *       mgr = t->manager;
2317            struct peer_atom * atom = candidates[i];
2318            tr_peerIo *        io;
2319
2320            tordbg( t, "Starting an OUTGOING connection with %s",
2321                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2322
2323            io =
2324                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2325                                      t->hash );
2326            if( io == NULL )
2327            {
2328                atom->myflags |= MYFLAG_UNREACHABLE;
2329            }
2330            else
2331            {
2332                tr_handshake * handshake = tr_handshakeNew(
2333                    io,
2334                    mgr->session->
2335                    encryptionMode,
2336                    myHandshakeDoneCB,
2337                    mgr );
2338
2339                assert( tr_peerIoGetTorrentHash( io ) );
2340
2341                ++newConnectionsThisSecond;
2342
2343                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2344                                         handshakeCompare );
2345            }
2346
2347            atom->time = time( NULL );
2348        }
2349
2350        /* cleanup */
2351        tr_free( connections );
2352        tr_free( candidates );
2353    }
2354
2355    torrentUnlock( t );
2356    return TRUE;
2357}
2358
2359/****
2360*****
2361*****  BANDWIDTH ALLOCATION
2362*****
2363****/
2364
2365static double
2366allocateHowMuch( double                  desired_average_kb_per_sec,
2367                 const tr_ratecontrol  * ratecontrol )
2368{
2369    const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
2370    const double seconds_per_pulse = BANDWIDTH_PERIOD_MSEC / 1000.0;
2371    const double baseline_bytes_per_pulse = desired_average_kb_per_sec * 1024.0 * seconds_per_pulse;
2372    const double min = baseline_bytes_per_pulse * 0.80;
2373    const double max = baseline_bytes_per_pulse * 1.10;
2374    const double current_bytes_per_pulse = tr_rcRate( ratecontrol ) * 1024.0 * seconds_per_pulse;
2375    const double next_pulse_bytes = baseline_bytes_per_pulse * ( pulses_per_history + 1 )
2376                                  - ( current_bytes_per_pulse * pulses_per_history );
2377    double clamped;
2378
2379    /* clamp the return value to lessen oscillation */
2380    clamped = next_pulse_bytes;
2381    clamped = MAX( clamped, min );
2382    clamped = MIN( clamped, max );
2383
2384#if 0
2385fprintf( stderr, "desiredAvgKB is %5.2f, rate is %5.2f, allocating %5.2f (%5.2f)\n",
2386         desired_average_kb_per_sec,
2387         tr_rcRate( ratecontrol ),
2388         clamped/1024.0,
2389         next_pulse_bytes/1024.0 );
2390#endif
2391
2392    return clamped;
2393}
2394
2395/**
2396 * Distributes a fixed amount of bandwidth among a set of peers.
2397 *
2398 * @param peerArray peers whose client-to-peer bandwidth will be set
2399 * @param direction whether to allocate upload or download bandwidth
2400 * @param history recent bandwidth history for these peers
2401 * @param desiredAvgKB overall bandwidth goal for this set of peers
2402 */
2403static void
2404setPeerBandwidth( tr_ptrArray          * peerArray,
2405                  const tr_direction     direction,
2406                  const tr_ratecontrol * ratecontrol,
2407                  double                 desiredAvgKB )
2408{
2409    const int    peerCount = tr_ptrArraySize( peerArray );
2410    const double bytes = allocateHowMuch( desiredAvgKB, ratecontrol );
2411    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2412    const double meritBytes = MAX( 0, bytes - welfareBytes );
2413    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2414    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2415    int          i;
2416    int          candidateCount;
2417    double       welfare;
2418    size_t       bytesUsed;
2419
2420    assert( meritBytes >= 0.0 );
2421    assert( welfareBytes >= 0.0 );
2422    assert( direction == TR_UP || direction == TR_DOWN );
2423
2424    for( i = candidateCount = 0; i < peerCount; ++i )
2425        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2426            candidates[candidateCount++] = peers[i];
2427        else
2428            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2429
2430    for( i = bytesUsed = 0; i < candidateCount; ++i )
2431        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2432                                                direction );
2433
2434    welfare = welfareBytes / candidateCount;
2435
2436    for( i = 0; i < candidateCount; ++i )
2437    {
2438        tr_peer *    peer = candidates[i];
2439        const double merit = bytesUsed
2440                             ? ( meritBytes *
2441                                tr_peerIoGetBandwidthUsed( peer->io,
2442                                                           direction ) ) /
2443                             bytesUsed
2444                             : ( meritBytes / candidateCount );
2445        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2446    }
2447
2448    /* cleanup */
2449    tr_free( candidates );
2450}
2451
2452static size_t
2453countHandshakeBandwidth( tr_ptrArray * handshakes,
2454                         tr_direction  direction )
2455{
2456    const int n = tr_ptrArraySize( handshakes );
2457    int       i;
2458    size_t    total;
2459
2460    for( i = total = 0; i < n; ++i )
2461    {
2462        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2463        total += tr_peerIoGetBandwidthUsed( io, direction );
2464    }
2465    return total;
2466}
2467
2468static size_t
2469countPeerBandwidth( tr_ptrArray * peers,
2470                    tr_direction  direction )
2471{
2472    const int n = tr_ptrArraySize( peers );
2473    int       i;
2474    size_t    total;
2475
2476    for( i = total = 0; i < n; ++i )
2477    {
2478        tr_peer * peer = tr_ptrArrayNth( peers, i );
2479        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2480    }
2481    return total;
2482}
2483
2484static void
2485givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2486                             tr_direction  direction )
2487{
2488    const int n = tr_ptrArraySize( peers );
2489    int       i;
2490
2491    for( i = 0; i < n; ++i )
2492    {
2493        tr_peer * peer = tr_ptrArrayNth( peers, i );
2494        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2495    }
2496}
2497
2498static void
2499pumpAllPeers( tr_peerMgr * mgr )
2500{
2501    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2502    int       i, j;
2503
2504    for( i = 0; i < torrentCount; ++i )
2505    {
2506        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2507        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2508        {
2509            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2510            tr_peerMsgsPulse( peer->msgs );
2511        }
2512    }
2513}
2514
2515/**
2516 * Allocate bandwidth for each peer connection.
2517 *
2518 * @param mgr the peer manager
2519 * @param direction whether to allocate upload or download bandwidth
2520 * @return the amount of directional bandwidth used since the last pulse.
2521 */
2522static double
2523allocateBandwidth( tr_peerMgr * mgr,
2524                   tr_direction direction )
2525{
2526    tr_session *  session = mgr->session;
2527    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2528    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2529    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2530    double        allBytesUsed = 0;
2531    size_t        poolBytesUsed = 0;
2532    int           i;
2533
2534    assert( mgr );
2535    assert( direction == TR_UP || direction == TR_DOWN );
2536
2537    /* before allocating bandwidth, pump the connected peers */
2538    pumpAllPeers( mgr );
2539
2540    for( i=0; i<torrentCount; ++i )
2541    {
2542        Torrent * t = torrents[i];
2543        size_t used;
2544        tr_speedlimit speedMode;
2545
2546        /* no point in allocating bandwidth for stopped torrents */
2547        if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
2548            continue;
2549
2550        used = countPeerBandwidth( t->peers, direction );
2551        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2552
2553        /* remember this torrent's bytes used */
2554        tr_rcTransferred( t->tor->rawSpeed[direction], used );
2555
2556        /* add this torrent's bandwidth use to allBytesUsed */
2557        allBytesUsed += used;
2558
2559        /* if piece data is disallowed, don't bother limiting bandwidth --
2560         * we won't be asking for, or sending out, any pieces */
2561        if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
2562            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2563        else
2564            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2565           
2566        /* process the torrent's peers based on its speed mode */
2567        switch( speedMode )
2568        {
2569            case TR_SPEEDLIMIT_UNLIMITED:
2570                givePeersUnlimitedBandwidth( t->peers, direction );
2571                break;
2572
2573            case TR_SPEEDLIMIT_SINGLE:
2574                setPeerBandwidth( t->peers, direction,
2575                                  t->tor->rawSpeed[direction],
2576                                  tr_torrentGetSpeedLimit( t->tor, direction ) );
2577                break;
2578
2579            case TR_SPEEDLIMIT_GLOBAL:
2580            {
2581                int       i;
2582                const int n = tr_ptrArraySize( t->peers );
2583                for( i = 0; i < n; ++i )
2584                    tr_ptrArrayAppend( globalPool,
2585                                      tr_ptrArrayNth( t->peers, i ) );
2586                poolBytesUsed += used;
2587                break;
2588            }
2589        }
2590    }
2591
2592    /* add incoming handshakes to the global pool */
2593    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2594    allBytesUsed += i;
2595    poolBytesUsed += i;
2596
2597    tr_rcTransferred( mgr->globalPoolRawSpeed[direction], poolBytesUsed );
2598
2599    /* handle the global pool's connections */
2600    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2601        givePeersUnlimitedBandwidth( globalPool, direction );
2602    else
2603        setPeerBandwidth( globalPool, direction,
2604                          mgr->globalPoolRawSpeed[direction],
2605                          tr_sessionGetSpeedLimit( session, direction ) );
2606
2607    /* now that we've allocated bandwidth, pump all the connected peers */
2608    pumpAllPeers( mgr );
2609
2610    /* cleanup */
2611    tr_ptrArrayFree( globalPool, NULL );
2612    return allBytesUsed;
2613}
2614
2615static int
2616bandwidthPulse( void * vmgr )
2617{
2618    tr_peerMgr * mgr = vmgr;
2619    int          i;
2620
2621    managerLock( mgr );
2622
2623    /* allocate the upload and download bandwidth */
2624    for( i = 0; i < 2; ++i )
2625        allocateBandwidth( mgr, i );
2626
2627    managerUnlock( mgr );
2628    return TRUE;
2629}
2630
Note: See TracBrowser for help on using the repository browser.