source: trunk/libtransmission/peer-mgr.c @ 7060

Last change on this file since 7060 was 7060, checked in by charles, 13 years ago

(libT) fix r7055 bug reported by BentMyWookie?. Also, narrow the bandwidth allocator's `clamp' range to lessen oscillation

  • Property svn:keywords set to Date Rev Author Id
File size: 73.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7060 2008-11-06 04:16:53Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62    * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.
76     * if they try to connect to us it's okay */
77    MYFLAG_UNREACHABLE = 2,
78
79    /* the minimum we'll wait before attempting to reconnect to a peer */
80    MINIMUM_RECONNECT_INTERVAL_SECS = 5
81};
82
83
84/**
85***
86**/
87
88/* We keep one of these for every peer we know about, whether
89 * it's connected or not, so the struct must be small.
90 * When our current connections underperform, we dip back
91 * into this list for new ones. */
92struct peer_atom
93{
94    uint8_t           from;
95    uint8_t           flags; /* these match the added_f flags */
96    uint8_t           myflags; /* flags that aren't defined in added_f */
97    uint16_t          port;
98    uint16_t          numFails;
99    struct in_addr    addr;
100    time_t            time; /* when the peer's connection status last changed */
101    time_t            piece_data_time;
102};
103
104typedef struct
105{
106    unsigned int    isRunning : 1;
107
108    uint8_t         hash[SHA_DIGEST_LENGTH];
109    int         *   pendingRequestCount;
110    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
111    tr_ptrArray *   pool; /* struct peer_atom */
112    tr_ptrArray *   peers; /* tr_peer */
113    tr_ptrArray *   webseeds; /* tr_webseed */
114    tr_timer *      reconnectTimer;
115    tr_timer *      rechokeTimer;
116    tr_timer *      refillTimer;
117    tr_torrent *    tor;
118    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
119
120    struct tr_peerMgr * manager;
121}
122Torrent;
123
124struct tr_peerMgr
125{
126    uint8_t        bandwidthPulseNumber;
127    tr_session *   session;
128    tr_ptrArray *  torrents; /* Torrent */
129    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
130    tr_timer *     bandwidthTimer;
131    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
132};
133
134#define tordbg( t, ... ) \
135    do { \
136        if( tr_deepLoggingIsActive( ) ) \
137            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
138    } while( 0 )
139
140#define dbgmsg( ... ) \
141    do { \
142        if( tr_deepLoggingIsActive( ) ) \
143            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
144    } while( 0 )
145
146/**
147***
148**/
149
150static void
151managerLock( const struct tr_peerMgr * manager )
152{
153    tr_globalLock( manager->session );
154}
155
156static void
157managerUnlock( const struct tr_peerMgr * manager )
158{
159    tr_globalUnlock( manager->session );
160}
161
162static void
163torrentLock( Torrent * torrent )
164{
165    managerLock( torrent->manager );
166}
167
168static void
169torrentUnlock( Torrent * torrent )
170{
171    managerUnlock( torrent->manager );
172}
173
174static int
175torrentIsLocked( const Torrent * t )
176{
177    return tr_globalIsLocked( t->manager->session );
178}
179
180/**
181***
182**/
183
184static int
185compareAddresses( const struct in_addr * a,
186                  const struct in_addr * b )
187{
188    if( a->s_addr != b->s_addr )
189        return a->s_addr < b->s_addr ? -1 : 1;
190
191    return 0;
192}
193
194static int
195handshakeCompareToAddr( const void * va,
196                        const void * vb )
197{
198    const tr_handshake * a = va;
199
200    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
201}
202
203static int
204handshakeCompare( const void * a,
205                  const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray *          handshakes,
212                      const struct in_addr * in_addr )
213{
214    return tr_ptrArrayFindSorted( handshakes,
215                                  in_addr,
216                                  handshakeCompareToAddr );
217}
218
219static int
220comparePeerAtomToAddress( const void * va,
221                          const void * vb )
222{
223    const struct peer_atom * a = va;
224
225    return compareAddresses( &a->addr, vb );
226}
227
228static int
229comparePeerAtoms( const void * va,
230                  const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va,
272             const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return compareAddresses( &a->in_addr, &b->in_addr );
278}
279
280static int
281peerCompareToAddr( const void * va,
282                   const void * vb )
283{
284    const tr_peer * a = va;
285
286    return compareAddresses( &a->in_addr, vb );
287}
288
289static tr_peer*
290getExistingPeer( Torrent *              torrent,
291                 const struct in_addr * in_addr )
292{
293    assert( torrentIsLocked( torrent ) );
294    assert( in_addr );
295
296    return tr_ptrArrayFindSorted( torrent->peers,
297                                  in_addr,
298                                  peerCompareToAddr );
299}
300
301static struct peer_atom*
302getExistingAtom( const                  Torrent * t,
303                 const struct in_addr * addr )
304{
305    assert( torrentIsLocked( t ) );
306    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
307}
308
309static int
310peerIsInUse( const Torrent *        ct,
311             const struct in_addr * addr )
312{
313    Torrent * t = (Torrent*) ct;
314
315    assert( torrentIsLocked ( t ) );
316
317    return getExistingPeer( t, addr )
318           || getExistingHandshake( t->outgoingHandshakes, addr )
319           || getExistingHandshake( t->manager->incomingHandshakes, addr );
320}
321
322static tr_peer*
323peerConstructor( const struct in_addr * in_addr )
324{
325    tr_peer * p;
326
327    p = tr_new0( tr_peer, 1 );
328    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
329    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
330    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
331    return p;
332}
333
334static tr_peer*
335getPeer( Torrent *              torrent,
336         const struct in_addr * in_addr )
337{
338    tr_peer * peer;
339
340    assert( torrentIsLocked( torrent ) );
341
342    peer = getExistingPeer( torrent, in_addr );
343
344    if( peer == NULL )
345    {
346        peer = peerConstructor( in_addr );
347        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
348    }
349
350    return peer;
351}
352
353static void
354peerDestructor( tr_peer * peer )
355{
356    assert( peer );
357    assert( peer->msgs );
358
359    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
360    tr_peerMsgsFree( peer->msgs );
361
362    tr_peerIoFree( peer->io );
363
364    tr_bitfieldFree( peer->have );
365    tr_bitfieldFree( peer->blame );
366    tr_free( peer->client );
367
368    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
369    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
370    tr_free( peer );
371}
372
373static void
374removePeer( Torrent * t,
375            tr_peer * peer )
376{
377    tr_peer *          removed;
378    struct peer_atom * atom;
379
380    assert( torrentIsLocked( t ) );
381
382    atom = getExistingAtom( t, &peer->in_addr );
383    assert( atom );
384    atom->time = time( NULL );
385
386    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
387    assert( removed == peer );
388    peerDestructor( removed );
389}
390
391static void
392removeAllPeers( Torrent * t )
393{
394    while( !tr_ptrArrayEmpty( t->peers ) )
395        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
396}
397
398static void
399torrentDestructor( void * vt )
400{
401    Torrent * t = vt;
402    uint8_t   hash[SHA_DIGEST_LENGTH];
403
404    assert( t );
405    assert( !t->isRunning );
406    assert( t->peers );
407    assert( torrentIsLocked( t ) );
408    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
409    assert( tr_ptrArrayEmpty( t->peers ) );
410
411    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
412
413    tr_timerFree( &t->reconnectTimer );
414    tr_timerFree( &t->rechokeTimer );
415    tr_timerFree( &t->refillTimer );
416
417    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
418    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
419    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
420    tr_ptrArrayFree( t->peers, NULL );
421
422    tr_free( t->pendingRequestCount );
423    tr_free( t );
424}
425
426static void peerCallbackFunc( void * vpeer,
427                              void * vevent,
428                              void * vt );
429
430static Torrent*
431torrentConstructor( tr_peerMgr * manager,
432                    tr_torrent * tor )
433{
434    int       i;
435    Torrent * t;
436
437    t = tr_new0( Torrent, 1 );
438    t->manager = manager;
439    t->tor = tor;
440    t->pool = tr_ptrArrayNew( );
441    t->peers = tr_ptrArrayNew( );
442    t->webseeds = tr_ptrArrayNew( );
443    t->outgoingHandshakes = tr_ptrArrayNew( );
444    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
445
446    for( i = 0; i < tor->info.webseedCount; ++i )
447    {
448        tr_webseed * w =
449            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
450                           t );
451        tr_ptrArrayAppend( t->webseeds, w );
452    }
453
454    return t;
455}
456
457/**
458 * For explanation, see http://www.bittorrent.org/fast_extensions.html
459 * Also see the "test-allowed-set" unit test
460 *
461 * @param k number of pieces in set
462 * @param sz number of pieces in the torrent
463 * @param infohash torrent's SHA1 hash
464 * @param ip peer's address
465 */
466struct tr_bitfield *
467tr_peerMgrGenerateAllowedSet(
468    const uint32_t         k,
469    const uint32_t         sz,
470    const                  uint8_t        *
471                           infohash,
472    const struct in_addr * ip )
473{
474    uint8_t       w[SHA_DIGEST_LENGTH + 4];
475    uint8_t       x[SHA_DIGEST_LENGTH];
476    tr_bitfield * a;
477    uint32_t      a_size;
478
479    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
480    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
481    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
482
483    a = tr_bitfieldNew( sz );
484    a_size = 0;
485
486    while( a_size < k )
487    {
488        int i;
489        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
490        {
491            uint32_t j = i * 4;                                /* (5) */
492            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
493            uint32_t index = y % sz;                           /* (7) */
494            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
495            {
496                tr_bitfieldAdd( a, index );                    /* (9) */
497                ++a_size;
498            }
499        }
500        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
501    }
502
503    return a;
504}
505
506static int bandwidthPulse( void * vmgr );
507
508tr_peerMgr*
509tr_peerMgrNew( tr_session * session )
510{
511    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
512
513    m->session = session;
514    m->torrents = tr_ptrArrayNew( );
515    m->incomingHandshakes = tr_ptrArrayNew( );
516    m->bandwidthPulseNumber = -1;
517    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
518                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
519    return m;
520}
521
522void
523tr_peerMgrFree( tr_peerMgr * manager )
524{
525    managerLock( manager );
526
527    tr_timerFree( &manager->bandwidthTimer );
528
529    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
530     * the item from manager->handshakes, so this is a little roundabout... */
531    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
532        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
533
534    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
535
536    /* free the torrents. */
537    tr_ptrArrayFree( manager->torrents, torrentDestructor );
538
539    managerUnlock( manager );
540    tr_free( manager );
541}
542
543static tr_peer**
544getConnectedPeers( Torrent * t,
545                   int *     setmeCount )
546{
547    int       i, peerCount, connectionCount;
548    tr_peer **peers;
549    tr_peer **ret;
550
551    assert( torrentIsLocked( t ) );
552
553    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
554    ret = tr_new( tr_peer *, peerCount );
555
556    for( i = connectionCount = 0; i < peerCount; ++i )
557        if( peers[i]->msgs )
558            ret[connectionCount++] = peers[i];
559
560    *setmeCount = connectionCount;
561    return ret;
562}
563
564static int
565clientIsDownloadingFrom( const tr_peer * peer )
566{
567    return peer->clientIsInterested && !peer->clientIsChoked;
568}
569
570static int
571clientIsUploadingTo( const tr_peer * peer )
572{
573    return peer->peerIsInterested && !peer->peerIsChoked;
574}
575
576/***
577****
578***/
579
580int
581tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
582                      const uint8_t *        torrentHash,
583                      const struct in_addr * addr )
584{
585    int                      isSeed = FALSE;
586    const Torrent *          t = NULL;
587    const struct peer_atom * atom = NULL;
588
589    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
590    if( t )
591        atom = getExistingAtom( t, addr );
592    if( atom )
593        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
594
595    return isSeed;
596}
597
598/****
599*****
600*****  REFILL
601*****
602****/
603
604static void
605assertValidPiece( Torrent * t, tr_piece_index_t piece )
606{
607    assert( t );
608    assert( t->tor );
609    assert( piece < t->tor->info.pieceCount );
610}
611
612static int
613getPieceRequests( Torrent * t, tr_piece_index_t piece )
614{
615    assertValidPiece( t, piece );
616
617    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
618}
619
620static void
621incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
622{
623    assertValidPiece( t, piece );
624
625    if( t->pendingRequestCount == NULL )
626        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
627    t->pendingRequestCount[piece]++;
628}
629
630static void
631decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
632{
633    assertValidPiece( t, piece );
634
635    if( t->pendingRequestCount )
636        t->pendingRequestCount[piece]--;
637}
638
639struct tr_refill_piece
640{
641    tr_priority_t    priority;
642    uint32_t         piece;
643    uint32_t         peerCount;
644    int              random;
645    int              pendingRequestCount;
646    int              missingBlockCount;
647};
648
649static int
650compareRefillPiece( const void * aIn,
651                    const void * bIn )
652{
653    const struct tr_refill_piece * a = aIn;
654    const struct tr_refill_piece * b = bIn;
655
656    /* if one piece has a higher priority, it goes first */
657    if( a->priority != b->priority )
658        return a->priority > b->priority ? -1 : 1;
659
660    /* have a per-priority endgame */
661    if( a->pendingRequestCount != b->pendingRequestCount )
662        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
663
664    /* fewer missing pieces goes first */
665    if( a->missingBlockCount != b->missingBlockCount )
666        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
667
668    /* otherwise if one has fewer peers, it goes first */
669    if( a->peerCount != b->peerCount )
670        return a->peerCount < b->peerCount ? -1 : 1;
671
672    /* otherwise go with our random seed */
673    if( a->random != b->random )
674        return a->random < b->random ? -1 : 1;
675
676    return 0;
677}
678
679static tr_piece_index_t *
680getPreferredPieces( Torrent           * t,
681                    tr_piece_index_t  * pieceCount )
682{
683    const tr_torrent  * tor = t->tor;
684    const tr_info     * inf = &tor->info;
685    tr_piece_index_t    i;
686    tr_piece_index_t    poolSize = 0;
687    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
688    int                 peerCount;
689    tr_peer**           peers;
690
691    assert( torrentIsLocked( t ) );
692
693    peers = getConnectedPeers( t, &peerCount );
694
695    /* make a list of the pieces that we want but don't have */
696    for( i = 0; i < inf->pieceCount; ++i )
697        if( !tor->info.pieces[i].dnd
698                && !tr_cpPieceIsComplete( tor->completion, i ) )
699            pool[poolSize++] = i;
700
701    /* sort the pool by which to request next */
702    if( poolSize > 1 )
703    {
704        tr_piece_index_t j;
705        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
706
707        for( j = 0; j < poolSize; ++j )
708        {
709            int k;
710            const tr_piece_index_t piece = pool[j];
711            struct tr_refill_piece * setme = p + j;
712
713            setme->piece = piece;
714            setme->priority = inf->pieces[piece].priority;
715            setme->peerCount = 0;
716            setme->random = tr_cryptoWeakRandInt( INT_MAX );
717            setme->pendingRequestCount = getPieceRequests( t, piece );
718            setme->missingBlockCount
719                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
720
721            for( k = 0; k < peerCount; ++k )
722            {
723                const tr_peer * peer = peers[k];
724                if( peer->peerIsInterested
725                        && !peer->clientIsChoked
726                        && tr_bitfieldHas( peer->have, piece ) )
727                    ++setme->peerCount;
728            }
729        }
730
731        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
732               compareRefillPiece );
733
734        for( j = 0; j < poolSize; ++j )
735            pool[j] = p[j].piece;
736
737        tr_free( p );
738    }
739
740    tr_free( peers );
741
742    *pieceCount = poolSize;
743    return pool;
744}
745
746struct tr_blockIterator
747{
748    Torrent * t;
749    tr_block_index_t blockIndex, blockCount, *blocks;
750    tr_piece_index_t pieceIndex, pieceCount, *pieces;
751};
752
753static struct tr_blockIterator*
754blockIteratorNew( Torrent * t )
755{
756    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
757    i->t = t;
758    i->pieces = getPreferredPieces( t, &i->pieceCount );
759    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
760    return i;
761}
762
763static int
764blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
765{
766    int found;
767    Torrent * t = i->t;
768    tr_torrent * tor = t->tor;
769
770    while( ( i->blockIndex == i->blockCount )
771        && ( i->pieceIndex < i->pieceCount ) )
772    {
773        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
774        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
775        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
776        tr_block_index_t block;
777
778        assert( index < tor->info.pieceCount );
779
780        i->blockCount = 0;
781        i->blockIndex = 0;
782        for( block=b; block!=e; ++block )
783            if( !tr_cpBlockIsComplete( tor->completion, block ) )
784                i->blocks[i->blockCount++] = block;
785    }
786
787    if(( found = ( i->blockIndex < i->blockCount )))
788        *setme = i->blocks[i->blockIndex++];
789
790    return found;
791}
792
793static void
794blockIteratorFree( struct tr_blockIterator * i )
795{
796    tr_free( i->blocks );
797    tr_free( i->pieces );
798    tr_free( i );
799}
800
801static tr_peer**
802getPeersUploadingToClient( Torrent * t,
803                           int *     setmeCount )
804{
805    int j;
806    int peerCount = 0;
807    int retCount = 0;
808    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
809    tr_peer ** ret = tr_new( tr_peer *, peerCount );
810
811    j = 0; /* this is a temporary test to make sure we walk through all the peers */
812    if( peerCount )
813    {
814        /* Get a list of peers we're downloading from.
815           Pick a different starting point each time so all peers
816           get a chance at being the first in line */
817        const int fencepost = tr_cryptoWeakRandInt( peerCount );
818        int i = fencepost;
819        do {
820            if( clientIsDownloadingFrom( peers[i] ) )
821                ret[retCount++] = peers[i];
822            i = ( i + 1 ) % peerCount;
823            ++j;
824        } while( i != fencepost );
825    }
826    assert( j == peerCount );
827    *setmeCount = retCount;
828    return ret;
829}
830
831static uint32_t
832getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
833{
834    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
835    const uint64_t blockPos = tor->blockSize * b;
836    assert( blockPos >= piecePos );
837    return (uint32_t)( blockPos - piecePos );
838}
839
840static int
841refillPulse( void * vtorrent )
842{
843    tr_block_index_t block;
844    int peerCount;
845    int webseedCount;
846    tr_peer ** peers;
847    tr_webseed ** webseeds;
848    struct tr_blockIterator * blockIterator;
849    Torrent * t = vtorrent;
850    tr_torrent * tor = t->tor;
851
852    if( !t->isRunning )
853        return TRUE;
854    if( tr_torrentIsSeed( t->tor ) )
855        return TRUE;
856
857    torrentLock( t );
858    tordbg( t, "Refilling Request Buffers..." );
859
860    blockIterator = blockIteratorNew( t );
861    peers = getPeersUploadingToClient( t, &peerCount );
862    webseedCount = tr_ptrArraySize( t->webseeds );
863    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
864                          webseedCount * sizeof( tr_webseed* ) );
865
866    while( ( webseedCount || peerCount )
867        && blockIteratorNext( blockIterator, &block ) )
868    {
869        int j;
870        int handled = FALSE;
871
872        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
873        const uint32_t offset = getBlockOffsetInPiece( tor, block );
874        const uint32_t length = tr_torBlockCountBytes( tor, block );
875
876        /* find a peer who can ask for this block */
877        for( j=0; !handled && j<peerCount; )
878        {
879            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
880                                                   index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                case TR_ADDREQ_CLIENT_CHOKED:
885                    peers[j] = peers[--peerCount];
886                    break;
887
888                case TR_ADDREQ_MISSING:
889                case TR_ADDREQ_DUPLICATE:
890                    ++j;
891                    break;
892
893                case TR_ADDREQ_OK:
894                    incrementPieceRequests( t, index );
895                    handled = TRUE;
896                    break;
897
898                default:
899                    assert( 0 && "unhandled value" );
900                    break;
901            }
902        }
903
904        /* maybe one of the webseeds can do it */
905        for( j=0; !handled && j<webseedCount; )
906        {
907            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
908                                                          index, offset, length );
909            switch( val )
910            {
911                case TR_ADDREQ_FULL:
912                    webseeds[j] = webseeds[--webseedCount];
913                    break;
914
915                case TR_ADDREQ_OK:
916                    incrementPieceRequests( t, index );
917                    handled = TRUE;
918                    break;
919
920                default:
921                    assert( 0 && "unhandled value" );
922                    break;
923            }
924        }
925    }
926
927    /* cleanup */
928    blockIteratorFree( blockIterator );
929    tr_free( webseeds );
930    tr_free( peers );
931
932    t->refillTimer = NULL;
933    torrentUnlock( t );
934    return FALSE;
935}
936
937static void
938broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
939{
940    int i, size;
941    tr_peer ** peers;
942
943    assert( torrentIsLocked( t ) );
944
945    peers = getConnectedPeers( t, &size );
946    for( i=0; i<size; ++i )
947        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
948    tr_free( peers );
949}
950
951static void
952addStrike( Torrent * t,
953           tr_peer * peer )
954{
955    tordbg( t, "increasing peer %s strike count to %d",
956            tr_peerIoAddrStr( &peer->in_addr,
957                              peer->port ), peer->strikes + 1 );
958
959    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
960    {
961        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
962        atom->myflags |= MYFLAG_BANNED;
963        peer->doPurge = 1;
964        tordbg( t, "banning peer %s",
965               tr_peerIoAddrStr( &atom->addr, atom->port ) );
966    }
967}
968
969static void
970gotBadPiece( Torrent *        t,
971             tr_piece_index_t pieceIndex )
972{
973    tr_torrent *   tor = t->tor;
974    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
975
976    tor->corruptCur += byteCount;
977    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
978}
979
980static void
981refillSoon( Torrent * t )
982{
983    if( t->refillTimer == NULL )
984        t->refillTimer = tr_timerNew( t->manager->session,
985                                      refillPulse, t,
986                                      REFILL_PERIOD_MSEC );
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent *             t = (Torrent *) vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_NEED_REQ:
1003            refillSoon( t );
1004            break;
1005
1006        case TR_PEER_CANCEL:
1007            decrementPieceRequests( t, e->pieceIndex );
1008            break;
1009
1010        case TR_PEER_PEER_GOT_DATA:
1011        {
1012            const time_t now = time( NULL );
1013            tr_torrent * tor = t->tor;
1014            tor->activityDate = now;
1015            tor->uploadedCur += e->length;
1016            if( peer )
1017                tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1018            tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1019            tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1020            tr_statsAddUploaded( tor->session, e->length );
1021            if( peer )
1022            {
1023                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1024                a->piece_data_time = now;
1025            }
1026            break;
1027        }
1028
1029        case TR_PEER_CLIENT_GOT_DATA:
1030        {
1031            const time_t now = time( NULL );
1032            tr_torrent * tor = t->tor;
1033            tor->activityDate = now;
1034            tr_statsAddDownloaded( tor->session, e->length );
1035            if( peer )
1036                tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1037            tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1038            tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1039            /* only add this to downloadedCur if we got it from a peer --
1040             * webseeds shouldn't count against our ratio.  As one tracker
1041             * admin put it, "Those pieces are downloaded directly from the
1042             * content distributor, not the peers, it is the tracker's job
1043             * to manage the swarms, not the web server and does not fit
1044             * into the jurisdiction of the tracker." */
1045            if( peer )
1046                tor->downloadedCur += e->length;
1047            if( peer ) {
1048                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1049                a->piece_data_time = now;
1050            }
1051            break;
1052        }
1053
1054        case TR_PEER_PEER_PROGRESS:
1055        {
1056            if( peer )
1057            {
1058                struct peer_atom * atom = getExistingAtom( t,
1059                                                           &peer->in_addr );
1060                const int          peerIsSeed = e->progress >= 1.0;
1061                if( peerIsSeed )
1062                {
1063                    tordbg( t, "marking peer %s as a seed",
1064                           tr_peerIoAddrStr( &atom->addr,
1065                                             atom->port ) );
1066                    atom->flags |= ADDED_F_SEED_FLAG;
1067                }
1068                else
1069                {
1070                    tordbg( t, "marking peer %s as a non-seed",
1071                           tr_peerIoAddrStr( &atom->addr,
1072                                             atom->port ) );
1073                    atom->flags &= ~ADDED_F_SEED_FLAG;
1074                }
1075            }
1076            break;
1077        }
1078
1079        case TR_PEER_CLIENT_GOT_BLOCK:
1080        {
1081            tr_torrent *     tor = t->tor;
1082
1083            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1084                                                e->offset );
1085
1086            tr_cpBlockAdd( tor->completion, block );
1087            decrementPieceRequests( t, e->pieceIndex );
1088
1089            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1090
1091            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1092            {
1093                const tr_piece_index_t p = e->pieceIndex;
1094                const int              ok = tr_ioTestPiece( tor, p );
1095
1096                if( !ok )
1097                {
1098                    tr_torerr( tor,
1099                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1100                              (unsigned long)p );
1101                }
1102
1103                tr_torrentSetHasPiece( tor, p, ok );
1104                tr_torrentSetPieceChecked( tor, p, TRUE );
1105                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1106
1107                if( !ok )
1108                    gotBadPiece( t, p );
1109                else
1110                {
1111                    int        i, peerCount;
1112                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1113                    for( i = 0; i < peerCount; ++i )
1114                        tr_peerMsgsHave( peers[i]->msgs, p );
1115                    tr_free( peers );
1116                }
1117
1118                tr_torrentRecheckCompleteness( tor );
1119            }
1120            break;
1121        }
1122
1123        case TR_PEER_ERROR:
1124            if( e->err == EINVAL )
1125            {
1126                addStrike( t, peer );
1127                peer->doPurge = 1;
1128            }
1129            else if( ( e->err == ERANGE )
1130                  || ( e->err == EMSGSIZE )
1131                  || ( e->err == ENOTCONN ) )
1132            {
1133                /* some protocol error from the peer */
1134                peer->doPurge = 1;
1135            }
1136            else /* a local error, such as an IO error */
1137            {
1138                t->tor->error = e->err;
1139                tr_strlcpy( t->tor->errorString,
1140                            tr_strerror( t->tor->error ),
1141                            sizeof( t->tor->errorString ) );
1142                tr_torrentStop( t->tor );
1143            }
1144            break;
1145
1146        default:
1147            assert( 0 );
1148    }
1149
1150    torrentUnlock( t );
1151}
1152
1153static void
1154ensureAtomExists( Torrent *              t,
1155                  const struct in_addr * addr,
1156                  uint16_t               port,
1157                  uint8_t                flags,
1158                  uint8_t                from )
1159{
1160    if( getExistingAtom( t, addr ) == NULL )
1161    {
1162        struct peer_atom * a;
1163        a = tr_new0( struct peer_atom, 1 );
1164        a->addr = *addr;
1165        a->port = port;
1166        a->flags = flags;
1167        a->from = from;
1168        tordbg( t, "got a new atom: %s",
1169               tr_peerIoAddrStr( &a->addr, a->port ) );
1170        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1171    }
1172}
1173
1174static int
1175getMaxPeerCount( const tr_torrent * tor )
1176{
1177    return tor->maxConnectedPeers;
1178}
1179
1180static int
1181getPeerCount( const Torrent * t )
1182{
1183    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1184               t->outgoingHandshakes );
1185}
1186
1187/* FIXME: this is kind of a mess. */
1188static int
1189myHandshakeDoneCB( tr_handshake *  handshake,
1190                   tr_peerIo *     io,
1191                   int             isConnected,
1192                   const uint8_t * peer_id,
1193                   void *          vmanager )
1194{
1195    int                    ok = isConnected;
1196    int                    success = FALSE;
1197    uint16_t               port;
1198    const struct in_addr * addr;
1199    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1200    Torrent *              t;
1201    tr_handshake *         ours;
1202
1203    assert( io );
1204    assert( isConnected == 0 || isConnected == 1 );
1205
1206    t = tr_peerIoHasTorrentHash( io )
1207        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1208        : NULL;
1209
1210    if( tr_peerIoIsIncoming ( io ) )
1211        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1212                                        handshake, handshakeCompare );
1213    else if( t )
1214        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1215                                        handshake, handshakeCompare );
1216    else
1217        ours = handshake;
1218
1219    assert( ours );
1220    assert( ours == handshake );
1221
1222    if( t )
1223        torrentLock( t );
1224
1225    addr = tr_peerIoGetAddress( io, &port );
1226
1227    if( !ok || !t || !t->isRunning )
1228    {
1229        if( t )
1230        {
1231            struct peer_atom * atom = getExistingAtom( t, addr );
1232            if( atom )
1233                ++atom->numFails;
1234        }
1235
1236        tr_peerIoFree( io );
1237    }
1238    else /* looking good */
1239    {
1240        struct peer_atom * atom;
1241        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1242        atom = getExistingAtom( t, addr );
1243        atom->time = time( NULL );
1244
1245        if( atom->myflags & MYFLAG_BANNED )
1246        {
1247            tordbg( t, "banned peer %s tried to reconnect",
1248                   tr_peerIoAddrStr( &atom->addr,
1249                                     atom->port ) );
1250            tr_peerIoFree( io );
1251        }
1252        else if( tr_peerIoIsIncoming( io )
1253               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1254
1255        {
1256            tr_peerIoFree( io );
1257        }
1258        else
1259        {
1260            tr_peer * peer = getExistingPeer( t, addr );
1261
1262            if( peer ) /* we already have this peer */
1263            {
1264                tr_peerIoFree( io );
1265            }
1266            else
1267            {
1268                peer = getPeer( t, addr );
1269                tr_free( peer->client );
1270
1271                if( !peer_id )
1272                    peer->client = NULL;
1273                else
1274                {
1275                    char client[128];
1276                    tr_clientForId( client, sizeof( client ), peer_id );
1277                    peer->client = tr_strdup( client );
1278                }
1279                peer->port = port;
1280                peer->io = io;
1281                peer->msgs =
1282                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1283                                    &peer->msgsTag );
1284
1285                success = TRUE;
1286            }
1287        }
1288    }
1289
1290    if( t )
1291        torrentUnlock( t );
1292
1293    return success;
1294}
1295
1296void
1297tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1298                       struct in_addr * addr,
1299                       uint16_t         port,
1300                       int              socket )
1301{
1302    managerLock( manager );
1303
1304    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1305    {
1306        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1307               inet_ntoa( *addr ) );
1308        tr_netClose( socket );
1309    }
1310    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1311    {
1312        tr_netClose( socket );
1313    }
1314    else /* we don't have a connetion to them yet... */
1315    {
1316        tr_peerIo *    io;
1317        tr_handshake * handshake;
1318
1319        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1320
1321        handshake = tr_handshakeNew( io,
1322                                     manager->session->encryptionMode,
1323                                     myHandshakeDoneCB,
1324                                     manager );
1325
1326        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1327                                 handshakeCompare );
1328    }
1329
1330    managerUnlock( manager );
1331}
1332
1333void
1334tr_peerMgrAddPex( tr_peerMgr *    manager,
1335                  const uint8_t * torrentHash,
1336                  uint8_t         from,
1337                  const tr_pex *  pex )
1338{
1339    Torrent * t;
1340
1341    managerLock( manager );
1342
1343    t = getExistingTorrent( manager, torrentHash );
1344    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1345        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1346
1347    managerUnlock( manager );
1348}
1349
1350tr_pex *
1351tr_peerMgrCompactToPex( const void *    compact,
1352                        size_t          compactLen,
1353                        const uint8_t * added_f,
1354                        size_t          added_f_len,
1355                        size_t *        pexCount )
1356{
1357    size_t          i;
1358    size_t          n = compactLen / 6;
1359    const uint8_t * walk = compact;
1360    tr_pex *        pex = tr_new0( tr_pex, n );
1361
1362    for( i = 0; i < n; ++i )
1363    {
1364        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1365        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1366        if( added_f && ( n == added_f_len ) )
1367            pex[i].flags = added_f[i];
1368    }
1369
1370    *pexCount = n;
1371    return pex;
1372}
1373
1374/**
1375***
1376**/
1377
1378void
1379tr_peerMgrSetBlame( tr_peerMgr *     manager,
1380                    const uint8_t *  torrentHash,
1381                    tr_piece_index_t pieceIndex,
1382                    int              success )
1383{
1384    if( !success )
1385    {
1386        int        peerCount, i;
1387        Torrent *  t = getExistingTorrent( manager, torrentHash );
1388        tr_peer ** peers;
1389
1390        assert( torrentIsLocked( t ) );
1391
1392        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1393        for( i = 0; i < peerCount; ++i )
1394        {
1395            tr_peer * peer = peers[i];
1396            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1397            {
1398                tordbg(
1399                    t,
1400                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1401                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1402                    pieceIndex, (int)peer->strikes + 1 );
1403                addStrike( t, peer );
1404            }
1405        }
1406    }
1407}
1408
1409int
1410tr_pexCompare( const void * va,
1411               const void * vb )
1412{
1413    const tr_pex * a = va;
1414    const tr_pex * b = vb;
1415    int            i =
1416        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1417
1418    if( i ) return i;
1419    if( a->port < b->port ) return -1;
1420    if( a->port > b->port ) return 1;
1421    return 0;
1422}
1423
1424int tr_pexCompare( const void * a,
1425                   const void * b );
1426
1427static int
1428peerPrefersCrypto( const tr_peer * peer )
1429{
1430    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1431        return TRUE;
1432
1433    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1434        return FALSE;
1435
1436    return tr_peerIoIsEncrypted( peer->io );
1437}
1438
1439int
1440tr_peerMgrGetPeers( tr_peerMgr *    manager,
1441                    const uint8_t * torrentHash,
1442                    tr_pex **       setme_pex )
1443{
1444    int peerCount = 0;
1445    const Torrent *  t;
1446
1447    managerLock( manager );
1448
1449    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1450    if( !t )
1451    {
1452        *setme_pex = NULL;
1453    }
1454    else
1455    {
1456        int i;
1457        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1458        tr_pex * pex = tr_new( tr_pex, peerCount );
1459        tr_pex * walk = pex;
1460
1461        for( i = 0; i < peerCount; ++i, ++walk )
1462        {
1463            const tr_peer * peer = peers[i];
1464            walk->in_addr = peer->in_addr;
1465            walk->port = peer->port;
1466            walk->flags = 0;
1467            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1468            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1469        }
1470
1471        assert( ( walk - pex ) == peerCount );
1472        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1473        *setme_pex = pex;
1474    }
1475
1476    managerUnlock( manager );
1477    return peerCount;
1478}
1479
1480static int reconnectPulse( void * vtorrent );
1481
1482static int rechokePulse( void * vtorrent );
1483
1484void
1485tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1486                        const uint8_t * torrentHash )
1487{
1488    Torrent * t;
1489
1490    managerLock( manager );
1491
1492    t = getExistingTorrent( manager, torrentHash );
1493
1494    assert( t );
1495    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1496    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1497
1498    if( !t->isRunning )
1499    {
1500        t->isRunning = 1;
1501
1502        t->reconnectTimer = tr_timerNew( t->manager->session,
1503                                         reconnectPulse, t,
1504                                         RECONNECT_PERIOD_MSEC );
1505
1506        t->rechokeTimer = tr_timerNew( t->manager->session,
1507                                       rechokePulse, t,
1508                                       RECHOKE_PERIOD_MSEC );
1509
1510        reconnectPulse( t );
1511
1512        rechokePulse( t );
1513
1514        if( !tr_ptrArrayEmpty( t->webseeds ) )
1515            refillSoon( t );
1516    }
1517
1518    managerUnlock( manager );
1519}
1520
1521static void
1522stopTorrent( Torrent * t )
1523{
1524    assert( torrentIsLocked( t ) );
1525
1526    t->isRunning = 0;
1527    tr_timerFree( &t->rechokeTimer );
1528    tr_timerFree( &t->reconnectTimer );
1529
1530    /* disconnect the peers. */
1531    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1532    tr_ptrArrayClear( t->peers );
1533
1534    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1535     * which removes the handshake from t->outgoingHandshakes... */
1536    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1537        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1538}
1539
1540void
1541tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1542                       const uint8_t * torrentHash )
1543{
1544    managerLock( manager );
1545
1546    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1547
1548    managerUnlock( manager );
1549}
1550
1551void
1552tr_peerMgrAddTorrent( tr_peerMgr * manager,
1553                      tr_torrent * tor )
1554{
1555    Torrent * t;
1556
1557    managerLock( manager );
1558
1559    assert( tor );
1560    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1561
1562    t = torrentConstructor( manager, tor );
1563    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1564
1565    managerUnlock( manager );
1566}
1567
1568void
1569tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1570                         const uint8_t * torrentHash )
1571{
1572    Torrent * t;
1573
1574    managerLock( manager );
1575
1576    t = getExistingTorrent( manager, torrentHash );
1577    assert( t );
1578    stopTorrent( t );
1579    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1580    torrentDestructor( t );
1581
1582    managerUnlock( manager );
1583}
1584
1585void
1586tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1587                               const uint8_t *    torrentHash,
1588                               int8_t *           tab,
1589                               unsigned int       tabCount )
1590{
1591    tr_piece_index_t   i;
1592    const Torrent *    t;
1593    const tr_torrent * tor;
1594    float              interval;
1595    int                isComplete;
1596    int                peerCount;
1597    const tr_peer **   peers;
1598
1599    managerLock( manager );
1600
1601    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1602    tor = t->tor;
1603    interval = tor->info.pieceCount / (float)tabCount;
1604    isComplete = tor
1605                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1606    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1607
1608    memset( tab, 0, tabCount );
1609
1610    for( i = 0; tor && i < tabCount; ++i )
1611    {
1612        const int piece = i * interval;
1613
1614        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1615            tab[i] = -1;
1616        else if( peerCount )
1617        {
1618            int j;
1619            for( j = 0; j < peerCount; ++j )
1620                if( tr_bitfieldHas( peers[j]->have, i ) )
1621                    ++tab[i];
1622        }
1623    }
1624
1625    managerUnlock( manager );
1626}
1627
1628/* Returns the pieces that are available from peers */
1629tr_bitfield*
1630tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1631                        const uint8_t *    torrentHash )
1632{
1633    int           i, size;
1634    Torrent *     t;
1635    tr_peer **    peers;
1636    tr_bitfield * pieces;
1637
1638    managerLock( manager );
1639
1640    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1641    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1642    peers = getConnectedPeers( t, &size );
1643    for( i = 0; i < size; ++i )
1644        tr_bitfieldOr( pieces, peers[i]->have );
1645
1646    managerUnlock( manager );
1647    tr_free( peers );
1648    return pieces;
1649}
1650
1651int
1652tr_peerMgrHasConnections( const tr_peerMgr * manager,
1653                          const uint8_t *    torrentHash )
1654{
1655    int             ret;
1656    const Torrent * t;
1657
1658    managerLock( manager );
1659
1660    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1661    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1662               || !tr_ptrArrayEmpty( t->webseeds ) );
1663
1664    managerUnlock( manager );
1665    return ret;
1666}
1667
1668void
1669tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1670                        const uint8_t *    torrentHash,
1671                        int *              setmePeersKnown,
1672                        int *              setmePeersConnected,
1673                        int *              setmeSeedsConnected,
1674                        int *              setmeWebseedsSendingToUs,
1675                        int *              setmePeersSendingToUs,
1676                        int *              setmePeersGettingFromUs,
1677                        int *              setmePeersFrom )
1678{
1679    int                 i, size;
1680    const Torrent *     t;
1681    const tr_peer **    peers;
1682    const tr_webseed ** webseeds;
1683
1684    managerLock( manager );
1685
1686    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1687    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1688
1689    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1690    *setmePeersConnected       = 0;
1691    *setmeSeedsConnected       = 0;
1692    *setmePeersGettingFromUs   = 0;
1693    *setmePeersSendingToUs     = 0;
1694    *setmeWebseedsSendingToUs  = 0;
1695
1696    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1697        setmePeersFrom[i] = 0;
1698
1699    for( i = 0; i < size; ++i )
1700    {
1701        const tr_peer *          peer = peers[i];
1702        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1703
1704        if( peer->io == NULL ) /* not connected */
1705            continue;
1706
1707        ++ * setmePeersConnected;
1708
1709        ++setmePeersFrom[atom->from];
1710
1711        if( clientIsDownloadingFrom( peer ) )
1712            ++ * setmePeersSendingToUs;
1713
1714        if( clientIsUploadingTo( peer ) )
1715            ++ * setmePeersGettingFromUs;
1716
1717        if( atom->flags & ADDED_F_SEED_FLAG )
1718            ++ * setmeSeedsConnected;
1719    }
1720
1721    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1722    for( i = 0; i < size; ++i )
1723    {
1724        if( tr_webseedIsActive( webseeds[i] ) )
1725            ++ * setmeWebseedsSendingToUs;
1726    }
1727
1728    managerUnlock( manager );
1729}
1730
1731float*
1732tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1733                     const uint8_t *    torrentHash )
1734{
1735    const Torrent *     t;
1736    const tr_webseed ** webseeds;
1737    int                 i;
1738    int                 webseedCount;
1739    float *             ret;
1740
1741    assert( manager );
1742    managerLock( manager );
1743
1744    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1745    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1746                                                     &webseedCount );
1747    assert( webseedCount == t->tor->info.webseedCount );
1748    ret = tr_new0( float, webseedCount );
1749
1750    for( i = 0; i < webseedCount; ++i )
1751        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1752            ret[i] = -1.0;
1753
1754    managerUnlock( manager );
1755    return ret;
1756}
1757
1758double
1759tr_peerGetPieceSpeed( const tr_peer    * peer,
1760                      tr_direction       direction )
1761{
1762    assert( peer );
1763    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1764
1765    return tr_rcRate( peer->pieceSpeed[direction] );
1766}
1767
1768
1769struct tr_peer_stat *
1770tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1771                     const   uint8_t     * torrentHash,
1772                     int                 * setmeCount UNUSED )
1773{
1774    int             i, size;
1775    const Torrent * t;
1776    tr_peer **      peers;
1777    tr_peer_stat *  ret;
1778
1779    assert( manager );
1780    managerLock( manager );
1781
1782    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1783    peers = getConnectedPeers( (Torrent*)t, &size );
1784    ret = tr_new0( tr_peer_stat, size );
1785
1786    for( i = 0; i < size; ++i )
1787    {
1788        char *                   pch;
1789        const tr_peer *          peer = peers[i];
1790        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1791        tr_peer_stat *           stat = ret + i;
1792
1793        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1794        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1795                   sizeof( stat->client ) );
1796        stat->port               = peer->port;
1797        stat->from               = atom->from;
1798        stat->progress           = peer->progress;
1799        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1800        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1801        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1802        stat->peerIsChoked       = peer->peerIsChoked;
1803        stat->peerIsInterested   = peer->peerIsInterested;
1804        stat->clientIsChoked     = peer->clientIsChoked;
1805        stat->clientIsInterested = peer->clientIsInterested;
1806        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1807        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1808        stat->isUploadingTo      = clientIsUploadingTo( peer );
1809
1810        pch = stat->flagStr;
1811        if( t->optimistic == peer ) *pch++ = 'O';
1812        if( stat->isDownloadingFrom ) *pch++ = 'D';
1813        else if( stat->clientIsInterested ) *pch++ = 'd';
1814        if( stat->isUploadingTo ) *pch++ = 'U';
1815        else if( stat->peerIsInterested ) *pch++ = 'u';
1816        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1817                'K';
1818        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1819        if( stat->isEncrypted ) *pch++ = 'E';
1820        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1821        if( stat->isIncoming ) *pch++ = 'I';
1822        *pch = '\0';
1823    }
1824
1825    *setmeCount = size;
1826    tr_free( peers );
1827
1828    managerUnlock( manager );
1829    return ret;
1830}
1831
1832/**
1833***
1834**/
1835
1836struct ChokeData
1837{
1838    unsigned int    doUnchoke    : 1;
1839    unsigned int    isInterested : 1;
1840    double          rateToClient;
1841    double          rateToPeer;
1842    tr_peer *       peer;
1843};
1844
1845static int
1846tr_compareDouble( double a,
1847                  double b )
1848{
1849    if( a < b ) return -1;
1850    if( a > b ) return 1;
1851    return 0;
1852}
1853
1854static int
1855compareChoke( const void * va,
1856              const void * vb )
1857{
1858    const struct ChokeData * a = va;
1859    const struct ChokeData * b = vb;
1860    int                      diff = 0;
1861
1862    if( diff == 0 ) /* prefer higher dl speeds */
1863        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1864    if( diff == 0 ) /* prefer higher ul speeds */
1865        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1866    if( diff == 0 ) /* prefer unchoked */
1867        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1868
1869    return diff;
1870}
1871
1872static int
1873isNew( const tr_peer * peer )
1874{
1875    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1876}
1877
1878static int
1879isSame( const tr_peer * peer )
1880{
1881    return peer && peer->client && strstr( peer->client, "Transmission" );
1882}
1883
1884/**
1885***
1886**/
1887
1888static void
1889rechoke( Torrent * t )
1890{
1891    int                i, peerCount, size, unchokedInterested;
1892    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1893    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1894
1895    assert( torrentIsLocked( t ) );
1896
1897    /* sort the peers by preference and rate */
1898    for( i = 0, size = 0; i < peerCount; ++i )
1899    {
1900        tr_peer * peer = peers[i];
1901        if( peer->progress >= 1.0 ) /* choke all seeds */
1902            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1903        else
1904        {
1905            struct ChokeData * n = &choke[size++];
1906            n->peer         = peer;
1907            n->isInterested = peer->peerIsInterested;
1908            n->rateToPeer   = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1909            n->rateToClient = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1910        }
1911    }
1912
1913    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1914
1915    /**
1916     * Reciprocation and number of uploads capping is managed by unchoking
1917     * the N peers which have the best upload rate and are interested.
1918     * This maximizes the client's download rate. These N peers are
1919     * referred to as downloaders, because they are interested in downloading
1920     * from the client.
1921     *
1922     * Peers which have a better upload rate (as compared to the downloaders)
1923     * but aren't interested get unchoked. If they become interested, the
1924     * downloader with the worst upload rate gets choked. If a client has
1925     * a complete file, it uses its upload rate rather than its download
1926     * rate to decide which peers to unchoke.
1927     */
1928    unchokedInterested = 0;
1929    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1930    {
1931        choke[i].doUnchoke = 1;
1932        if( choke[i].isInterested )
1933            ++unchokedInterested;
1934    }
1935
1936    /* optimistic unchoke */
1937    if( i < size )
1938    {
1939        int                n;
1940        struct ChokeData * c;
1941        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1942
1943        for( ; i < size; ++i )
1944        {
1945            if( choke[i].isInterested )
1946            {
1947                const tr_peer * peer = choke[i].peer;
1948                int             x = 1, y;
1949                if( isNew( peer ) ) x *= 3;
1950                if( isSame( peer ) ) x *= 3;
1951                for( y = 0; y < x; ++y )
1952                    tr_ptrArrayAppend( randPool, &choke[i] );
1953            }
1954        }
1955
1956        if( ( n = tr_ptrArraySize( randPool ) ) )
1957        {
1958            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1959            c->doUnchoke = 1;
1960            t->optimistic = c->peer;
1961        }
1962
1963        tr_ptrArrayFree( randPool, NULL );
1964    }
1965
1966    for( i = 0; i < size; ++i )
1967        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1968
1969    /* cleanup */
1970    tr_free( choke );
1971    tr_free( peers );
1972}
1973
1974static int
1975rechokePulse( void * vtorrent )
1976{
1977    Torrent * t = vtorrent;
1978
1979    torrentLock( t );
1980    rechoke( t );
1981    torrentUnlock( t );
1982    return TRUE;
1983}
1984
1985/***
1986****
1987****  Life and Death
1988****
1989***/
1990
1991static int
1992shouldPeerBeClosed( const Torrent * t,
1993                    const tr_peer * peer,
1994                    int             peerCount )
1995{
1996    const tr_torrent *       tor = t->tor;
1997    const time_t             now = time( NULL );
1998    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1999
2000    /* if it's marked for purging, close it */
2001    if( peer->doPurge )
2002    {
2003        tordbg( t, "purging peer %s because its doPurge flag is set",
2004               tr_peerIoAddrStr( &atom->addr,
2005                                 atom->port ) );
2006        return TRUE;
2007    }
2008
2009    /* if we're seeding and the peer has everything we have,
2010     * and enough time has passed for a pex exchange, then disconnect */
2011    if( tr_torrentIsSeed( tor ) )
2012    {
2013        int peerHasEverything;
2014        if( atom->flags & ADDED_F_SEED_FLAG )
2015            peerHasEverything = TRUE;
2016        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2017            peerHasEverything = FALSE;
2018        else
2019        {
2020            tr_bitfield * tmp =
2021                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2022            tr_bitfieldDifference( tmp, peer->have );
2023            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2024            tr_bitfieldFree( tmp );
2025        }
2026        if( peerHasEverything
2027          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2028        {
2029            tordbg( t, "purging peer %s because we're both seeds",
2030                   tr_peerIoAddrStr( &atom->addr,
2031                                     atom->port ) );
2032            return TRUE;
2033        }
2034    }
2035
2036    /* disconnect if it's been too long since piece data has been transferred.
2037     * this is on a sliding scale based on number of available peers... */
2038    {
2039        const int    relaxStrictnessIfFewerThanN =
2040            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2041        /* if we have >= relaxIfFewerThan, strictness is 100%.
2042         * if we have zero connections, strictness is 0% */
2043        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2044                                  ? 1.0
2045                                  : peerCount /
2046                                  (float)relaxStrictnessIfFewerThanN;
2047        const int    lo = MIN_UPLOAD_IDLE_SECS;
2048        const int    hi = MAX_UPLOAD_IDLE_SECS;
2049        const int    limit = lo + ( ( hi - lo ) * strictness );
2050        const time_t then = peer->pieceDataActivityDate;
2051        const int    idleTime = then ? ( now - then ) : 0;
2052        if( idleTime > limit )
2053        {
2054            tordbg(
2055                t,
2056                "purging peer %s because it's been %d secs since we shared anything",
2057                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2058            return TRUE;
2059        }
2060    }
2061
2062    return FALSE;
2063}
2064
2065static tr_peer **
2066getPeersToClose( Torrent * t,
2067                 int *     setmeSize )
2068{
2069    int               i, peerCount, outsize;
2070    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2071                                                           &peerCount );
2072    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2073
2074    assert( torrentIsLocked( t ) );
2075
2076    for( i = outsize = 0; i < peerCount; ++i )
2077        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2078            ret[outsize++] = peers[i];
2079
2080    *setmeSize = outsize;
2081    return ret;
2082}
2083
2084static int
2085compareCandidates( const void * va,
2086                   const void * vb )
2087{
2088    const struct peer_atom * a = *(const struct peer_atom**) va;
2089    const struct peer_atom * b = *(const struct peer_atom**) vb;
2090
2091    /* <Charles> Here we would probably want to try reconnecting to
2092     * peers that had most recently given us data. Lots of users have
2093     * trouble with resets due to their routers and/or ISPs. This way we
2094     * can quickly recover from an unwanted reset. So we sort
2095     * piece_data_time in descending order.
2096     */
2097
2098    if( a->piece_data_time != b->piece_data_time )
2099        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2100
2101    if( a->numFails != b->numFails )
2102        return a->numFails < b->numFails ? -1 : 1;
2103
2104    if( a->time != b->time )
2105        return a->time < b->time ? -1 : 1;
2106
2107    return 0;
2108}
2109
2110static int
2111getReconnectIntervalSecs( const struct peer_atom * atom )
2112{
2113    int          sec;
2114    const time_t now = time( NULL );
2115
2116    /* if we were recently connected to this peer and transferring piece
2117     * data, try to reconnect to them sooner rather that later -- we don't
2118     * want network troubles to get in the way of a good peer. */
2119    if( ( now - atom->piece_data_time ) <=
2120       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2121        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2122
2123    /* don't allow reconnects more often than our minimum */
2124    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2125        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2126
2127    /* otherwise, the interval depends on how many times we've tried
2128     * and failed to connect to the peer */
2129    else switch( atom->numFails )
2130        {
2131            case 0:
2132                sec = 0; break;
2133
2134            case 1:
2135                sec = 5; break;
2136
2137            case 2:
2138                sec = 2 * 60; break;
2139
2140            case 3:
2141                sec = 15 * 60; break;
2142
2143            case 4:
2144                sec = 30 * 60; break;
2145
2146            case 5:
2147                sec = 60 * 60; break;
2148
2149            default:
2150                sec = 120 * 60; break;
2151        }
2152
2153    return sec;
2154}
2155
2156static struct peer_atom **
2157getPeerCandidates(                               Torrent * t,
2158                                           int * setmeSize )
2159{
2160    int                 i, atomCount, retCount;
2161    struct peer_atom ** atoms;
2162    struct peer_atom ** ret;
2163    const time_t        now = time( NULL );
2164    const int           seed = tr_torrentIsSeed( t->tor );
2165
2166    assert( torrentIsLocked( t ) );
2167
2168    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2169    ret = tr_new( struct peer_atom*, atomCount );
2170    for( i = retCount = 0; i < atomCount; ++i )
2171    {
2172        int                interval;
2173        struct peer_atom * atom = atoms[i];
2174
2175        /* peer fed us too much bad data ... we only keep it around
2176         * now to weed it out in case someone sends it to us via pex */
2177        if( atom->myflags & MYFLAG_BANNED )
2178            continue;
2179
2180        /* peer was unconnectable before, so we're not going to keep trying.
2181         * this is needs a separate flag from `banned', since if they try
2182         * to connect to us later, we'll let them in */
2183        if( atom->myflags & MYFLAG_UNREACHABLE )
2184            continue;
2185
2186        /* we don't need two connections to the same peer... */
2187        if( peerIsInUse( t, &atom->addr ) )
2188            continue;
2189
2190        /* no need to connect if we're both seeds... */
2191        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2192            continue;
2193
2194        /* don't reconnect too often */
2195        interval = getReconnectIntervalSecs( atom );
2196        if( ( now - atom->time ) < interval )
2197        {
2198            tordbg(
2199                t,
2200                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2201                i, tr_peerIoAddrStr( &atom->addr,
2202                                     atom->port ), interval );
2203            continue;
2204        }
2205
2206        /* Don't connect to peers in our blocklist */
2207        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2208            continue;
2209
2210        ret[retCount++] = atom;
2211    }
2212
2213    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2214    *setmeSize = retCount;
2215    return ret;
2216}
2217
2218static int
2219reconnectPulse( void * vtorrent )
2220{
2221    Torrent *     t = vtorrent;
2222    static time_t prevTime = 0;
2223    static int    newConnectionsThisSecond = 0;
2224    time_t        now;
2225
2226    torrentLock( t );
2227
2228    now = time( NULL );
2229    if( prevTime != now )
2230    {
2231        prevTime = now;
2232        newConnectionsThisSecond = 0;
2233    }
2234
2235    if( !t->isRunning )
2236    {
2237        removeAllPeers( t );
2238    }
2239    else
2240    {
2241        int                 i, nCandidates, nBad;
2242        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2243        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2244
2245        if( nBad || nCandidates )
2246            tordbg(
2247                t, "reconnect pulse for [%s]: %d bad connections, "
2248                   "%d connection candidates, %d atoms, max per pulse is %d",
2249                t->tor->info.name, nBad, nCandidates,
2250                tr_ptrArraySize( t->pool ),
2251                (int)MAX_RECONNECTIONS_PER_PULSE );
2252
2253        /* disconnect some peers.
2254           if we transferred piece data, then they might be good peers,
2255           so reset their `numFails' weight to zero.  otherwise we connected
2256           to them fruitlessly, so mark it as another fail */
2257        for( i = 0; i < nBad; ++i )
2258        {
2259            tr_peer *          peer = connections[i];
2260            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2261            if( peer->pieceDataActivityDate )
2262                atom->numFails = 0;
2263            else
2264                ++atom->numFails;
2265            tordbg( t, "removing bad peer %s",
2266                   tr_peerIoGetAddrStr( peer->io ) );
2267            removePeer( t, peer );
2268        }
2269
2270        /* add some new ones */
2271        for( i = 0;    ( i < nCandidates )
2272           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2273           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2274           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2275             ++i )
2276        {
2277            tr_peerMgr *       mgr = t->manager;
2278            struct peer_atom * atom = candidates[i];
2279            tr_peerIo *        io;
2280
2281            tordbg( t, "Starting an OUTGOING connection with %s",
2282                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2283
2284            io =
2285                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2286                                      t->hash );
2287            if( io == NULL )
2288            {
2289                atom->myflags |= MYFLAG_UNREACHABLE;
2290            }
2291            else
2292            {
2293                tr_handshake * handshake = tr_handshakeNew(
2294                    io,
2295                    mgr->session->
2296                    encryptionMode,
2297                    myHandshakeDoneCB,
2298                    mgr );
2299
2300                assert( tr_peerIoGetTorrentHash( io ) );
2301
2302                ++newConnectionsThisSecond;
2303
2304                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2305                                         handshakeCompare );
2306            }
2307
2308            atom->time = time( NULL );
2309        }
2310
2311        /* cleanup */
2312        tr_free( connections );
2313        tr_free( candidates );
2314    }
2315
2316    torrentUnlock( t );
2317    return TRUE;
2318}
2319
2320/****
2321*****
2322*****  BANDWIDTH ALLOCATION
2323*****
2324****/
2325
2326static double
2327allocateHowMuch( double         desiredAvgKB,
2328                 const double * history )
2329{
2330    const double baseline = desiredAvgKB * 1024.0 /
2331                            BANDWIDTH_PULSES_PER_SECOND;
2332    const double min = baseline * 0.85;
2333    const double max = baseline * 1.15;
2334    int          i;
2335    double       usedBytes;
2336    double       n;
2337    double       clamped;
2338
2339    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2340        usedBytes += history[i];
2341
2342    n = ( desiredAvgKB * 1024.0 )
2343      * ( BANDWIDTH_PULSE_HISTORY + 1.0 )
2344      / BANDWIDTH_PULSES_PER_SECOND
2345      - usedBytes;
2346
2347    /* clamp the return value to lessen oscillation */
2348    clamped = n;
2349    clamped = MAX( clamped, min );
2350    clamped = MIN( clamped, max );
2351/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2352  (%.2f)\n", desiredAvgKB,
2353  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2354  clamped/1024.0, n/1024.0 );*/
2355    return clamped;
2356}
2357
2358/**
2359 * Distributes a fixed amount of bandwidth among a set of peers.
2360 *
2361 * @param peerArray peers whose client-to-peer bandwidth will be set
2362 * @param direction whether to allocate upload or download bandwidth
2363 * @param history recent bandwidth history for these peers
2364 * @param desiredAvgKB overall bandwidth goal for this set of peers
2365 */
2366static void
2367setPeerBandwidth( tr_ptrArray *      peerArray,
2368                  const tr_direction direction,
2369                  const double *     history,
2370                  double             desiredAvgKB )
2371{
2372    const int    peerCount = tr_ptrArraySize( peerArray );
2373    const double bytes = allocateHowMuch( desiredAvgKB, history );
2374    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2375    const double meritBytes = MAX( 0, bytes - welfareBytes );
2376    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2377    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2378    int          i;
2379    int          candidateCount;
2380    double       welfare;
2381    size_t       bytesUsed;
2382
2383    assert( meritBytes >= 0.0 );
2384    assert( welfareBytes >= 0.0 );
2385    assert( direction == TR_UP || direction == TR_DOWN );
2386
2387    for( i = candidateCount = 0; i < peerCount; ++i )
2388        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2389            candidates[candidateCount++] = peers[i];
2390        else
2391            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2392
2393    for( i = bytesUsed = 0; i < candidateCount; ++i )
2394        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2395                                                direction );
2396
2397    welfare = welfareBytes / candidateCount;
2398
2399    for( i = 0; i < candidateCount; ++i )
2400    {
2401        tr_peer *    peer = candidates[i];
2402        const double merit = bytesUsed
2403                             ? ( meritBytes *
2404                                tr_peerIoGetBandwidthUsed( peer->io,
2405                                                           direction ) ) /
2406                             bytesUsed
2407                             : ( meritBytes / candidateCount );
2408        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2409    }
2410
2411    /* cleanup */
2412    tr_free( candidates );
2413}
2414
2415static size_t
2416countHandshakeBandwidth( tr_ptrArray * handshakes,
2417                         tr_direction  direction )
2418{
2419    const int n = tr_ptrArraySize( handshakes );
2420    int       i;
2421    size_t    total;
2422
2423    for( i = total = 0; i < n; ++i )
2424    {
2425        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2426        total += tr_peerIoGetBandwidthUsed( io, direction );
2427    }
2428    return total;
2429}
2430
2431static size_t
2432countPeerBandwidth( tr_ptrArray * peers,
2433                    tr_direction  direction )
2434{
2435    const int n = tr_ptrArraySize( peers );
2436    int       i;
2437    size_t    total;
2438
2439    for( i = total = 0; i < n; ++i )
2440    {
2441        tr_peer * peer = tr_ptrArrayNth( peers, i );
2442        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2443    }
2444    return total;
2445}
2446
2447static void
2448givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2449                             tr_direction  direction )
2450{
2451    const int n = tr_ptrArraySize( peers );
2452    int       i;
2453
2454    for( i = 0; i < n; ++i )
2455    {
2456        tr_peer * peer = tr_ptrArrayNth( peers, i );
2457        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2458    }
2459}
2460
2461static void
2462pumpAllPeers( tr_peerMgr * mgr )
2463{
2464    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2465    int       i, j;
2466
2467    for( i = 0; i < torrentCount; ++i )
2468    {
2469        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2470        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2471        {
2472            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2473            tr_peerMsgsPulse( peer->msgs );
2474        }
2475    }
2476}
2477
2478/**
2479 * Allocate bandwidth for each peer connection.
2480 *
2481 * @param mgr the peer manager
2482 * @param direction whether to allocate upload or download bandwidth
2483 * @return the amount of directional bandwidth used since the last pulse.
2484 */
2485static double
2486allocateBandwidth( tr_peerMgr * mgr,
2487                   tr_direction direction )
2488{
2489    tr_session *  session = mgr->session;
2490    const int     pulseNumber = mgr->bandwidthPulseNumber;
2491    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2492    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2493    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2494    double        allBytesUsed = 0;
2495    size_t        poolBytesUsed = 0;
2496    int           i;
2497
2498    assert( mgr );
2499    assert( direction == TR_UP || direction == TR_DOWN );
2500
2501    /* before allocating bandwidth, pump the connected peers */
2502    pumpAllPeers( mgr );
2503
2504    for( i = 0; i < torrentCount; ++i )
2505    {
2506        Torrent *    t = torrents[i];
2507        const size_t used = countPeerBandwidth( t->peers, direction );
2508        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2509
2510        /* remember this torrent's bytes used */
2511        t->tor->rateHistory[direction][pulseNumber] = used;
2512
2513        /* add this torrent's bandwidth use to allBytesUsed */
2514        allBytesUsed += used;
2515
2516        /* process the torrent's peers based on its speed mode */
2517        switch( tr_torrentGetSpeedMode( t->tor, direction ) )
2518        {
2519            case TR_SPEEDLIMIT_UNLIMITED:
2520                givePeersUnlimitedBandwidth( t->peers, direction );
2521                break;
2522
2523            case TR_SPEEDLIMIT_SINGLE:
2524                setPeerBandwidth( t->peers, direction,
2525                                  t->tor->rateHistory[direction],
2526                                  tr_torrentGetSpeedLimit( t->tor,
2527                                                           direction ) );
2528                break;
2529
2530            case TR_SPEEDLIMIT_GLOBAL:
2531            {
2532                int       i;
2533                const int n = tr_ptrArraySize( t->peers );
2534                for( i = 0; i < n; ++i )
2535                    tr_ptrArrayAppend( globalPool,
2536                                      tr_ptrArrayNth( t->peers, i ) );
2537                poolBytesUsed += used;
2538                break;
2539            }
2540        }
2541    }
2542
2543    /* add incoming handshakes to the global pool */
2544    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2545    allBytesUsed += i;
2546    poolBytesUsed += i;
2547
2548    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2549
2550    /* handle the global pool's connections */
2551    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2552        givePeersUnlimitedBandwidth( globalPool, direction );
2553    else
2554        setPeerBandwidth( globalPool, direction,
2555                         mgr->globalPoolHistory[direction],
2556                         tr_sessionGetSpeedLimit( session, direction ) );
2557
2558    /* now that we've allocated bandwidth, pump all the connected peers */
2559    pumpAllPeers( mgr );
2560
2561    /* cleanup */
2562    tr_ptrArrayFree( globalPool, NULL );
2563    return allBytesUsed;
2564}
2565
2566static int
2567bandwidthPulse( void * vmgr )
2568{
2569    tr_peerMgr * mgr = vmgr;
2570    int          i;
2571
2572    managerLock( mgr );
2573
2574    /* keep track of how far we are into the cycle */
2575    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2576        mgr->bandwidthPulseNumber = 0;
2577
2578    /* allocate the upload and download bandwidth */
2579    for( i = 0; i < 2; ++i )
2580        allocateBandwidth( mgr, i );
2581
2582    managerUnlock( mgr );
2583    return TRUE;
2584}
2585
Note: See TracBrowser for help on using the repository browser.