source: trunk/libtransmission/peer-mgr.c @ 7064

Last change on this file since 7064 was 7064, checked in by charles, 13 years ago

#1429 (libT) cleaner handling of the special case where the upload or download speed limit is zero

  • Property svn:keywords set to Date Rev Author Id
File size: 73.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7064 2008-11-07 04:10:27Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62    * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.
76     * if they try to connect to us it's okay */
77    MYFLAG_UNREACHABLE = 2,
78
79    /* the minimum we'll wait before attempting to reconnect to a peer */
80    MINIMUM_RECONNECT_INTERVAL_SECS = 5
81};
82
83
84/**
85***
86**/
87
88/* We keep one of these for every peer we know about, whether
89 * it's connected or not, so the struct must be small.
90 * When our current connections underperform, we dip back
91 * into this list for new ones. */
92struct peer_atom
93{
94    uint8_t           from;
95    uint8_t           flags; /* these match the added_f flags */
96    uint8_t           myflags; /* flags that aren't defined in added_f */
97    uint16_t          port;
98    uint16_t          numFails;
99    struct in_addr    addr;
100    time_t            time; /* when the peer's connection status last changed */
101    time_t            piece_data_time;
102};
103
104typedef struct
105{
106    unsigned int    isRunning : 1;
107
108    uint8_t         hash[SHA_DIGEST_LENGTH];
109    int         *   pendingRequestCount;
110    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
111    tr_ptrArray *   pool; /* struct peer_atom */
112    tr_ptrArray *   peers; /* tr_peer */
113    tr_ptrArray *   webseeds; /* tr_webseed */
114    tr_timer *      reconnectTimer;
115    tr_timer *      rechokeTimer;
116    tr_timer *      refillTimer;
117    tr_torrent *    tor;
118    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
119
120    struct tr_peerMgr * manager;
121}
122Torrent;
123
124struct tr_peerMgr
125{
126    uint8_t        bandwidthPulseNumber;
127    tr_session *   session;
128    tr_ptrArray *  torrents; /* Torrent */
129    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
130    tr_timer *     bandwidthTimer;
131    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
132};
133
134#define tordbg( t, ... ) \
135    do { \
136        if( tr_deepLoggingIsActive( ) ) \
137            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
138    } while( 0 )
139
140#define dbgmsg( ... ) \
141    do { \
142        if( tr_deepLoggingIsActive( ) ) \
143            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
144    } while( 0 )
145
146/**
147***
148**/
149
150static void
151managerLock( const struct tr_peerMgr * manager )
152{
153    tr_globalLock( manager->session );
154}
155
156static void
157managerUnlock( const struct tr_peerMgr * manager )
158{
159    tr_globalUnlock( manager->session );
160}
161
162static void
163torrentLock( Torrent * torrent )
164{
165    managerLock( torrent->manager );
166}
167
168static void
169torrentUnlock( Torrent * torrent )
170{
171    managerUnlock( torrent->manager );
172}
173
174static int
175torrentIsLocked( const Torrent * t )
176{
177    return tr_globalIsLocked( t->manager->session );
178}
179
180/**
181***
182**/
183
184static int
185compareAddresses( const struct in_addr * a,
186                  const struct in_addr * b )
187{
188    if( a->s_addr != b->s_addr )
189        return a->s_addr < b->s_addr ? -1 : 1;
190
191    return 0;
192}
193
194static int
195handshakeCompareToAddr( const void * va,
196                        const void * vb )
197{
198    const tr_handshake * a = va;
199
200    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
201}
202
203static int
204handshakeCompare( const void * a,
205                  const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray *          handshakes,
212                      const struct in_addr * in_addr )
213{
214    return tr_ptrArrayFindSorted( handshakes,
215                                  in_addr,
216                                  handshakeCompareToAddr );
217}
218
219static int
220comparePeerAtomToAddress( const void * va,
221                          const void * vb )
222{
223    const struct peer_atom * a = va;
224
225    return compareAddresses( &a->addr, vb );
226}
227
228static int
229comparePeerAtoms( const void * va,
230                  const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va,
272             const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return compareAddresses( &a->in_addr, &b->in_addr );
278}
279
280static int
281peerCompareToAddr( const void * va,
282                   const void * vb )
283{
284    const tr_peer * a = va;
285
286    return compareAddresses( &a->in_addr, vb );
287}
288
289static tr_peer*
290getExistingPeer( Torrent *              torrent,
291                 const struct in_addr * in_addr )
292{
293    assert( torrentIsLocked( torrent ) );
294    assert( in_addr );
295
296    return tr_ptrArrayFindSorted( torrent->peers,
297                                  in_addr,
298                                  peerCompareToAddr );
299}
300
301static struct peer_atom*
302getExistingAtom( const                  Torrent * t,
303                 const struct in_addr * addr )
304{
305    assert( torrentIsLocked( t ) );
306    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
307}
308
309static int
310peerIsInUse( const Torrent *        ct,
311             const struct in_addr * addr )
312{
313    Torrent * t = (Torrent*) ct;
314
315    assert( torrentIsLocked ( t ) );
316
317    return getExistingPeer( t, addr )
318           || getExistingHandshake( t->outgoingHandshakes, addr )
319           || getExistingHandshake( t->manager->incomingHandshakes, addr );
320}
321
322static tr_peer*
323peerConstructor( const struct in_addr * in_addr )
324{
325    tr_peer * p;
326
327    p = tr_new0( tr_peer, 1 );
328    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
329    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
330    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
331    return p;
332}
333
334static tr_peer*
335getPeer( Torrent *              torrent,
336         const struct in_addr * in_addr )
337{
338    tr_peer * peer;
339
340    assert( torrentIsLocked( torrent ) );
341
342    peer = getExistingPeer( torrent, in_addr );
343
344    if( peer == NULL )
345    {
346        peer = peerConstructor( in_addr );
347        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
348    }
349
350    return peer;
351}
352
353static void
354peerDestructor( tr_peer * peer )
355{
356    assert( peer );
357    assert( peer->msgs );
358
359    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
360    tr_peerMsgsFree( peer->msgs );
361
362    tr_peerIoFree( peer->io );
363
364    tr_bitfieldFree( peer->have );
365    tr_bitfieldFree( peer->blame );
366    tr_free( peer->client );
367
368    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
369    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
370    tr_free( peer );
371}
372
373static void
374removePeer( Torrent * t,
375            tr_peer * peer )
376{
377    tr_peer *          removed;
378    struct peer_atom * atom;
379
380    assert( torrentIsLocked( t ) );
381
382    atom = getExistingAtom( t, &peer->in_addr );
383    assert( atom );
384    atom->time = time( NULL );
385
386    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
387    assert( removed == peer );
388    peerDestructor( removed );
389}
390
391static void
392removeAllPeers( Torrent * t )
393{
394    while( !tr_ptrArrayEmpty( t->peers ) )
395        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
396}
397
398static void
399torrentDestructor( void * vt )
400{
401    Torrent * t = vt;
402    uint8_t   hash[SHA_DIGEST_LENGTH];
403
404    assert( t );
405    assert( !t->isRunning );
406    assert( t->peers );
407    assert( torrentIsLocked( t ) );
408    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
409    assert( tr_ptrArrayEmpty( t->peers ) );
410
411    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
412
413    tr_timerFree( &t->reconnectTimer );
414    tr_timerFree( &t->rechokeTimer );
415    tr_timerFree( &t->refillTimer );
416
417    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
418    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
419    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
420    tr_ptrArrayFree( t->peers, NULL );
421
422    tr_free( t->pendingRequestCount );
423    tr_free( t );
424}
425
426static void peerCallbackFunc( void * vpeer,
427                              void * vevent,
428                              void * vt );
429
430static Torrent*
431torrentConstructor( tr_peerMgr * manager,
432                    tr_torrent * tor )
433{
434    int       i;
435    Torrent * t;
436
437    t = tr_new0( Torrent, 1 );
438    t->manager = manager;
439    t->tor = tor;
440    t->pool = tr_ptrArrayNew( );
441    t->peers = tr_ptrArrayNew( );
442    t->webseeds = tr_ptrArrayNew( );
443    t->outgoingHandshakes = tr_ptrArrayNew( );
444    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
445
446    for( i = 0; i < tor->info.webseedCount; ++i )
447    {
448        tr_webseed * w =
449            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
450                           t );
451        tr_ptrArrayAppend( t->webseeds, w );
452    }
453
454    return t;
455}
456
457/**
458 * For explanation, see http://www.bittorrent.org/fast_extensions.html
459 * Also see the "test-allowed-set" unit test
460 *
461 * @param k number of pieces in set
462 * @param sz number of pieces in the torrent
463 * @param infohash torrent's SHA1 hash
464 * @param ip peer's address
465 */
466struct tr_bitfield *
467tr_peerMgrGenerateAllowedSet(
468    const uint32_t         k,
469    const uint32_t         sz,
470    const                  uint8_t        *
471                           infohash,
472    const struct in_addr * ip )
473{
474    uint8_t       w[SHA_DIGEST_LENGTH + 4];
475    uint8_t       x[SHA_DIGEST_LENGTH];
476    tr_bitfield * a;
477    uint32_t      a_size;
478
479    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
480    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
481    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
482
483    a = tr_bitfieldNew( sz );
484    a_size = 0;
485
486    while( a_size < k )
487    {
488        int i;
489        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
490        {
491            uint32_t j = i * 4;                                /* (5) */
492            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
493            uint32_t index = y % sz;                           /* (7) */
494            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
495            {
496                tr_bitfieldAdd( a, index );                    /* (9) */
497                ++a_size;
498            }
499        }
500        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
501    }
502
503    return a;
504}
505
506static int bandwidthPulse( void * vmgr );
507
508tr_peerMgr*
509tr_peerMgrNew( tr_session * session )
510{
511    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
512
513    m->session = session;
514    m->torrents = tr_ptrArrayNew( );
515    m->incomingHandshakes = tr_ptrArrayNew( );
516    m->bandwidthPulseNumber = -1;
517    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
518                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
519    return m;
520}
521
522void
523tr_peerMgrFree( tr_peerMgr * manager )
524{
525    managerLock( manager );
526
527    tr_timerFree( &manager->bandwidthTimer );
528
529    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
530     * the item from manager->handshakes, so this is a little roundabout... */
531    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
532        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
533
534    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
535
536    /* free the torrents. */
537    tr_ptrArrayFree( manager->torrents, torrentDestructor );
538
539    managerUnlock( manager );
540    tr_free( manager );
541}
542
543static tr_peer**
544getConnectedPeers( Torrent * t,
545                   int *     setmeCount )
546{
547    int       i, peerCount, connectionCount;
548    tr_peer **peers;
549    tr_peer **ret;
550
551    assert( torrentIsLocked( t ) );
552
553    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
554    ret = tr_new( tr_peer *, peerCount );
555
556    for( i = connectionCount = 0; i < peerCount; ++i )
557        if( peers[i]->msgs )
558            ret[connectionCount++] = peers[i];
559
560    *setmeCount = connectionCount;
561    return ret;
562}
563
564static int
565clientIsDownloadingFrom( const tr_peer * peer )
566{
567    return peer->clientIsInterested && !peer->clientIsChoked;
568}
569
570static int
571clientIsUploadingTo( const tr_peer * peer )
572{
573    return peer->peerIsInterested && !peer->peerIsChoked;
574}
575
576/***
577****
578***/
579
580int
581tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
582                      const uint8_t *        torrentHash,
583                      const struct in_addr * addr )
584{
585    int                      isSeed = FALSE;
586    const Torrent *          t = NULL;
587    const struct peer_atom * atom = NULL;
588
589    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
590    if( t )
591        atom = getExistingAtom( t, addr );
592    if( atom )
593        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
594
595    return isSeed;
596}
597
598/****
599*****
600*****  REFILL
601*****
602****/
603
604static void
605assertValidPiece( Torrent * t, tr_piece_index_t piece )
606{
607    assert( t );
608    assert( t->tor );
609    assert( piece < t->tor->info.pieceCount );
610}
611
612static int
613getPieceRequests( Torrent * t, tr_piece_index_t piece )
614{
615    assertValidPiece( t, piece );
616
617    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
618}
619
620static void
621incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
622{
623    assertValidPiece( t, piece );
624
625    if( t->pendingRequestCount == NULL )
626        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
627    t->pendingRequestCount[piece]++;
628}
629
630static void
631decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
632{
633    assertValidPiece( t, piece );
634
635    if( t->pendingRequestCount )
636        t->pendingRequestCount[piece]--;
637}
638
639struct tr_refill_piece
640{
641    tr_priority_t    priority;
642    uint32_t         piece;
643    uint32_t         peerCount;
644    int              random;
645    int              pendingRequestCount;
646    int              missingBlockCount;
647};
648
649static int
650compareRefillPiece( const void * aIn,
651                    const void * bIn )
652{
653    const struct tr_refill_piece * a = aIn;
654    const struct tr_refill_piece * b = bIn;
655
656    /* if one piece has a higher priority, it goes first */
657    if( a->priority != b->priority )
658        return a->priority > b->priority ? -1 : 1;
659
660    /* have a per-priority endgame */
661    if( a->pendingRequestCount != b->pendingRequestCount )
662        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
663
664    /* fewer missing pieces goes first */
665    if( a->missingBlockCount != b->missingBlockCount )
666        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
667
668    /* otherwise if one has fewer peers, it goes first */
669    if( a->peerCount != b->peerCount )
670        return a->peerCount < b->peerCount ? -1 : 1;
671
672    /* otherwise go with our random seed */
673    if( a->random != b->random )
674        return a->random < b->random ? -1 : 1;
675
676    return 0;
677}
678
679static tr_piece_index_t *
680getPreferredPieces( Torrent           * t,
681                    tr_piece_index_t  * pieceCount )
682{
683    const tr_torrent  * tor = t->tor;
684    const tr_info     * inf = &tor->info;
685    tr_piece_index_t    i;
686    tr_piece_index_t    poolSize = 0;
687    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
688    int                 peerCount;
689    tr_peer**           peers;
690
691    assert( torrentIsLocked( t ) );
692
693    peers = getConnectedPeers( t, &peerCount );
694
695    /* make a list of the pieces that we want but don't have */
696    for( i = 0; i < inf->pieceCount; ++i )
697        if( !tor->info.pieces[i].dnd
698                && !tr_cpPieceIsComplete( tor->completion, i ) )
699            pool[poolSize++] = i;
700
701    /* sort the pool by which to request next */
702    if( poolSize > 1 )
703    {
704        tr_piece_index_t j;
705        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
706
707        for( j = 0; j < poolSize; ++j )
708        {
709            int k;
710            const tr_piece_index_t piece = pool[j];
711            struct tr_refill_piece * setme = p + j;
712
713            setme->piece = piece;
714            setme->priority = inf->pieces[piece].priority;
715            setme->peerCount = 0;
716            setme->random = tr_cryptoWeakRandInt( INT_MAX );
717            setme->pendingRequestCount = getPieceRequests( t, piece );
718            setme->missingBlockCount
719                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
720
721            for( k = 0; k < peerCount; ++k )
722            {
723                const tr_peer * peer = peers[k];
724                if( peer->peerIsInterested
725                        && !peer->clientIsChoked
726                        && tr_bitfieldHas( peer->have, piece ) )
727                    ++setme->peerCount;
728            }
729        }
730
731        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
732               compareRefillPiece );
733
734        for( j = 0; j < poolSize; ++j )
735            pool[j] = p[j].piece;
736
737        tr_free( p );
738    }
739
740    tr_free( peers );
741
742    *pieceCount = poolSize;
743    return pool;
744}
745
746struct tr_blockIterator
747{
748    Torrent * t;
749    tr_block_index_t blockIndex, blockCount, *blocks;
750    tr_piece_index_t pieceIndex, pieceCount, *pieces;
751};
752
753static struct tr_blockIterator*
754blockIteratorNew( Torrent * t )
755{
756    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
757    i->t = t;
758    i->pieces = getPreferredPieces( t, &i->pieceCount );
759    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
760    return i;
761}
762
763static int
764blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
765{
766    int found;
767    Torrent * t = i->t;
768    tr_torrent * tor = t->tor;
769
770    while( ( i->blockIndex == i->blockCount )
771        && ( i->pieceIndex < i->pieceCount ) )
772    {
773        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
774        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
775        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
776        tr_block_index_t block;
777
778        assert( index < tor->info.pieceCount );
779
780        i->blockCount = 0;
781        i->blockIndex = 0;
782        for( block=b; block!=e; ++block )
783            if( !tr_cpBlockIsComplete( tor->completion, block ) )
784                i->blocks[i->blockCount++] = block;
785    }
786
787    if(( found = ( i->blockIndex < i->blockCount )))
788        *setme = i->blocks[i->blockIndex++];
789
790    return found;
791}
792
793static void
794blockIteratorFree( struct tr_blockIterator * i )
795{
796    tr_free( i->blocks );
797    tr_free( i->pieces );
798    tr_free( i );
799}
800
801static tr_peer**
802getPeersUploadingToClient( Torrent * t,
803                           int *     setmeCount )
804{
805    int j;
806    int peerCount = 0;
807    int retCount = 0;
808    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
809    tr_peer ** ret = tr_new( tr_peer *, peerCount );
810
811    j = 0; /* this is a temporary test to make sure we walk through all the peers */
812    if( peerCount )
813    {
814        /* Get a list of peers we're downloading from.
815           Pick a different starting point each time so all peers
816           get a chance at being the first in line */
817        const int fencepost = tr_cryptoWeakRandInt( peerCount );
818        int i = fencepost;
819        do {
820            if( clientIsDownloadingFrom( peers[i] ) )
821                ret[retCount++] = peers[i];
822            i = ( i + 1 ) % peerCount;
823            ++j;
824        } while( i != fencepost );
825    }
826    assert( j == peerCount );
827    *setmeCount = retCount;
828    return ret;
829}
830
831static uint32_t
832getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
833{
834    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
835    const uint64_t blockPos = tor->blockSize * b;
836    assert( blockPos >= piecePos );
837    return (uint32_t)( blockPos - piecePos );
838}
839
840static int
841refillPulse( void * vtorrent )
842{
843    tr_block_index_t block;
844    int peerCount;
845    int webseedCount;
846    tr_peer ** peers;
847    tr_webseed ** webseeds;
848    struct tr_blockIterator * blockIterator;
849    Torrent * t = vtorrent;
850    tr_torrent * tor = t->tor;
851
852    if( !t->isRunning )
853        return TRUE;
854    if( tr_torrentIsSeed( t->tor ) )
855        return TRUE;
856
857    torrentLock( t );
858    tordbg( t, "Refilling Request Buffers..." );
859
860    blockIterator = blockIteratorNew( t );
861    peers = getPeersUploadingToClient( t, &peerCount );
862    webseedCount = tr_ptrArraySize( t->webseeds );
863    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
864                          webseedCount * sizeof( tr_webseed* ) );
865
866    while( ( webseedCount || peerCount )
867        && blockIteratorNext( blockIterator, &block ) )
868    {
869        int j;
870        int handled = FALSE;
871
872        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
873        const uint32_t offset = getBlockOffsetInPiece( tor, block );
874        const uint32_t length = tr_torBlockCountBytes( tor, block );
875
876        /* find a peer who can ask for this block */
877        for( j=0; !handled && j<peerCount; )
878        {
879            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
880                                                   index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                case TR_ADDREQ_CLIENT_CHOKED:
885                    peers[j] = peers[--peerCount];
886                    break;
887
888                case TR_ADDREQ_MISSING:
889                case TR_ADDREQ_DUPLICATE:
890                    ++j;
891                    break;
892
893                case TR_ADDREQ_OK:
894                    incrementPieceRequests( t, index );
895                    handled = TRUE;
896                    break;
897
898                default:
899                    assert( 0 && "unhandled value" );
900                    break;
901            }
902        }
903
904        /* maybe one of the webseeds can do it */
905        for( j=0; !handled && j<webseedCount; )
906        {
907            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
908                                                          index, offset, length );
909            switch( val )
910            {
911                case TR_ADDREQ_FULL:
912                    webseeds[j] = webseeds[--webseedCount];
913                    break;
914
915                case TR_ADDREQ_OK:
916                    incrementPieceRequests( t, index );
917                    handled = TRUE;
918                    break;
919
920                default:
921                    assert( 0 && "unhandled value" );
922                    break;
923            }
924        }
925    }
926
927    /* cleanup */
928    blockIteratorFree( blockIterator );
929    tr_free( webseeds );
930    tr_free( peers );
931
932    t->refillTimer = NULL;
933    torrentUnlock( t );
934    return FALSE;
935}
936
937static void
938broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
939{
940    int i, size;
941    tr_peer ** peers;
942
943    assert( torrentIsLocked( t ) );
944
945    peers = getConnectedPeers( t, &size );
946    for( i=0; i<size; ++i )
947        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
948    tr_free( peers );
949}
950
951static void
952addStrike( Torrent * t,
953           tr_peer * peer )
954{
955    tordbg( t, "increasing peer %s strike count to %d",
956            tr_peerIoAddrStr( &peer->in_addr,
957                              peer->port ), peer->strikes + 1 );
958
959    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
960    {
961        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
962        atom->myflags |= MYFLAG_BANNED;
963        peer->doPurge = 1;
964        tordbg( t, "banning peer %s",
965               tr_peerIoAddrStr( &atom->addr, atom->port ) );
966    }
967}
968
969static void
970gotBadPiece( Torrent *        t,
971             tr_piece_index_t pieceIndex )
972{
973    tr_torrent *   tor = t->tor;
974    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
975
976    tor->corruptCur += byteCount;
977    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
978}
979
980static void
981refillSoon( Torrent * t )
982{
983    if( t->refillTimer == NULL )
984        t->refillTimer = tr_timerNew( t->manager->session,
985                                      refillPulse, t,
986                                      REFILL_PERIOD_MSEC );
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent *             t = (Torrent *) vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_NEED_REQ:
1003            refillSoon( t );
1004            break;
1005
1006        case TR_PEER_CANCEL:
1007            decrementPieceRequests( t, e->pieceIndex );
1008            break;
1009
1010        case TR_PEER_PEER_GOT_DATA:
1011        {
1012            const time_t now = time( NULL );
1013            tr_torrent * tor = t->tor;
1014            tor->activityDate = now;
1015            tor->uploadedCur += e->length;
1016            if( peer )
1017                tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1018            tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1019            tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
1020            tr_statsAddUploaded( tor->session, e->length );
1021            if( peer )
1022            {
1023                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1024                a->piece_data_time = now;
1025            }
1026            break;
1027        }
1028
1029        case TR_PEER_CLIENT_GOT_DATA:
1030        {
1031            const time_t now = time( NULL );
1032            tr_torrent * tor = t->tor;
1033            tor->activityDate = now;
1034            tr_statsAddDownloaded( tor->session, e->length );
1035            if( peer )
1036                tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1037            tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1038            tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
1039            /* only add this to downloadedCur if we got it from a peer --
1040             * webseeds shouldn't count against our ratio.  As one tracker
1041             * admin put it, "Those pieces are downloaded directly from the
1042             * content distributor, not the peers, it is the tracker's job
1043             * to manage the swarms, not the web server and does not fit
1044             * into the jurisdiction of the tracker." */
1045            if( peer )
1046                tor->downloadedCur += e->length;
1047            if( peer ) {
1048                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1049                a->piece_data_time = now;
1050            }
1051            break;
1052        }
1053
1054        case TR_PEER_PEER_PROGRESS:
1055        {
1056            if( peer )
1057            {
1058                struct peer_atom * atom = getExistingAtom( t,
1059                                                           &peer->in_addr );
1060                const int          peerIsSeed = e->progress >= 1.0;
1061                if( peerIsSeed )
1062                {
1063                    tordbg( t, "marking peer %s as a seed",
1064                           tr_peerIoAddrStr( &atom->addr,
1065                                             atom->port ) );
1066                    atom->flags |= ADDED_F_SEED_FLAG;
1067                }
1068                else
1069                {
1070                    tordbg( t, "marking peer %s as a non-seed",
1071                           tr_peerIoAddrStr( &atom->addr,
1072                                             atom->port ) );
1073                    atom->flags &= ~ADDED_F_SEED_FLAG;
1074                }
1075            }
1076            break;
1077        }
1078
1079        case TR_PEER_CLIENT_GOT_BLOCK:
1080        {
1081            tr_torrent *     tor = t->tor;
1082
1083            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1084                                                e->offset );
1085
1086            tr_cpBlockAdd( tor->completion, block );
1087            decrementPieceRequests( t, e->pieceIndex );
1088
1089            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1090
1091            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1092            {
1093                const tr_piece_index_t p = e->pieceIndex;
1094                const int              ok = tr_ioTestPiece( tor, p );
1095
1096                if( !ok )
1097                {
1098                    tr_torerr( tor,
1099                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1100                              (unsigned long)p );
1101                }
1102
1103                tr_torrentSetHasPiece( tor, p, ok );
1104                tr_torrentSetPieceChecked( tor, p, TRUE );
1105                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1106
1107                if( !ok )
1108                    gotBadPiece( t, p );
1109                else
1110                {
1111                    int        i, peerCount;
1112                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1113                    for( i = 0; i < peerCount; ++i )
1114                        tr_peerMsgsHave( peers[i]->msgs, p );
1115                    tr_free( peers );
1116                }
1117
1118                tr_torrentRecheckCompleteness( tor );
1119            }
1120            break;
1121        }
1122
1123        case TR_PEER_ERROR:
1124            if( e->err == EINVAL )
1125            {
1126                addStrike( t, peer );
1127                peer->doPurge = 1;
1128            }
1129            else if( ( e->err == ERANGE )
1130                  || ( e->err == EMSGSIZE )
1131                  || ( e->err == ENOTCONN ) )
1132            {
1133                /* some protocol error from the peer */
1134                peer->doPurge = 1;
1135            }
1136            else /* a local error, such as an IO error */
1137            {
1138                t->tor->error = e->err;
1139                tr_strlcpy( t->tor->errorString,
1140                            tr_strerror( t->tor->error ),
1141                            sizeof( t->tor->errorString ) );
1142                tr_torrentStop( t->tor );
1143            }
1144            break;
1145
1146        default:
1147            assert( 0 );
1148    }
1149
1150    torrentUnlock( t );
1151}
1152
1153static void
1154ensureAtomExists( Torrent *              t,
1155                  const struct in_addr * addr,
1156                  uint16_t               port,
1157                  uint8_t                flags,
1158                  uint8_t                from )
1159{
1160    if( getExistingAtom( t, addr ) == NULL )
1161    {
1162        struct peer_atom * a;
1163        a = tr_new0( struct peer_atom, 1 );
1164        a->addr = *addr;
1165        a->port = port;
1166        a->flags = flags;
1167        a->from = from;
1168        tordbg( t, "got a new atom: %s",
1169               tr_peerIoAddrStr( &a->addr, a->port ) );
1170        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1171    }
1172}
1173
1174static int
1175getMaxPeerCount( const tr_torrent * tor )
1176{
1177    return tor->maxConnectedPeers;
1178}
1179
1180static int
1181getPeerCount( const Torrent * t )
1182{
1183    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1184               t->outgoingHandshakes );
1185}
1186
1187/* FIXME: this is kind of a mess. */
1188static int
1189myHandshakeDoneCB( tr_handshake *  handshake,
1190                   tr_peerIo *     io,
1191                   int             isConnected,
1192                   const uint8_t * peer_id,
1193                   void *          vmanager )
1194{
1195    int                    ok = isConnected;
1196    int                    success = FALSE;
1197    uint16_t               port;
1198    const struct in_addr * addr;
1199    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1200    Torrent *              t;
1201    tr_handshake *         ours;
1202
1203    assert( io );
1204    assert( isConnected == 0 || isConnected == 1 );
1205
1206    t = tr_peerIoHasTorrentHash( io )
1207        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1208        : NULL;
1209
1210    if( tr_peerIoIsIncoming ( io ) )
1211        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1212                                        handshake, handshakeCompare );
1213    else if( t )
1214        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1215                                        handshake, handshakeCompare );
1216    else
1217        ours = handshake;
1218
1219    assert( ours );
1220    assert( ours == handshake );
1221
1222    if( t )
1223        torrentLock( t );
1224
1225    addr = tr_peerIoGetAddress( io, &port );
1226
1227    if( !ok || !t || !t->isRunning )
1228    {
1229        if( t )
1230        {
1231            struct peer_atom * atom = getExistingAtom( t, addr );
1232            if( atom )
1233                ++atom->numFails;
1234        }
1235
1236        tr_peerIoFree( io );
1237    }
1238    else /* looking good */
1239    {
1240        struct peer_atom * atom;
1241        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1242        atom = getExistingAtom( t, addr );
1243        atom->time = time( NULL );
1244
1245        if( atom->myflags & MYFLAG_BANNED )
1246        {
1247            tordbg( t, "banned peer %s tried to reconnect",
1248                   tr_peerIoAddrStr( &atom->addr,
1249                                     atom->port ) );
1250            tr_peerIoFree( io );
1251        }
1252        else if( tr_peerIoIsIncoming( io )
1253               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1254
1255        {
1256            tr_peerIoFree( io );
1257        }
1258        else
1259        {
1260            tr_peer * peer = getExistingPeer( t, addr );
1261
1262            if( peer ) /* we already have this peer */
1263            {
1264                tr_peerIoFree( io );
1265            }
1266            else
1267            {
1268                peer = getPeer( t, addr );
1269                tr_free( peer->client );
1270
1271                if( !peer_id )
1272                    peer->client = NULL;
1273                else
1274                {
1275                    char client[128];
1276                    tr_clientForId( client, sizeof( client ), peer_id );
1277                    peer->client = tr_strdup( client );
1278                }
1279                peer->port = port;
1280                peer->io = io;
1281                peer->msgs =
1282                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1283                                    &peer->msgsTag );
1284
1285                success = TRUE;
1286            }
1287        }
1288    }
1289
1290    if( t )
1291        torrentUnlock( t );
1292
1293    return success;
1294}
1295
1296void
1297tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1298                       struct in_addr * addr,
1299                       uint16_t         port,
1300                       int              socket )
1301{
1302    managerLock( manager );
1303
1304    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1305    {
1306        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1307               inet_ntoa( *addr ) );
1308        tr_netClose( socket );
1309    }
1310    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1311    {
1312        tr_netClose( socket );
1313    }
1314    else /* we don't have a connetion to them yet... */
1315    {
1316        tr_peerIo *    io;
1317        tr_handshake * handshake;
1318
1319        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1320
1321        handshake = tr_handshakeNew( io,
1322                                     manager->session->encryptionMode,
1323                                     myHandshakeDoneCB,
1324                                     manager );
1325
1326        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1327                                 handshakeCompare );
1328    }
1329
1330    managerUnlock( manager );
1331}
1332
1333void
1334tr_peerMgrAddPex( tr_peerMgr *    manager,
1335                  const uint8_t * torrentHash,
1336                  uint8_t         from,
1337                  const tr_pex *  pex )
1338{
1339    Torrent * t;
1340
1341    managerLock( manager );
1342
1343    t = getExistingTorrent( manager, torrentHash );
1344    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1345        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1346
1347    managerUnlock( manager );
1348}
1349
1350tr_pex *
1351tr_peerMgrCompactToPex( const void *    compact,
1352                        size_t          compactLen,
1353                        const uint8_t * added_f,
1354                        size_t          added_f_len,
1355                        size_t *        pexCount )
1356{
1357    size_t          i;
1358    size_t          n = compactLen / 6;
1359    const uint8_t * walk = compact;
1360    tr_pex *        pex = tr_new0( tr_pex, n );
1361
1362    for( i = 0; i < n; ++i )
1363    {
1364        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1365        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1366        if( added_f && ( n == added_f_len ) )
1367            pex[i].flags = added_f[i];
1368    }
1369
1370    *pexCount = n;
1371    return pex;
1372}
1373
1374/**
1375***
1376**/
1377
1378void
1379tr_peerMgrSetBlame( tr_peerMgr *     manager,
1380                    const uint8_t *  torrentHash,
1381                    tr_piece_index_t pieceIndex,
1382                    int              success )
1383{
1384    if( !success )
1385    {
1386        int        peerCount, i;
1387        Torrent *  t = getExistingTorrent( manager, torrentHash );
1388        tr_peer ** peers;
1389
1390        assert( torrentIsLocked( t ) );
1391
1392        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1393        for( i = 0; i < peerCount; ++i )
1394        {
1395            tr_peer * peer = peers[i];
1396            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1397            {
1398                tordbg(
1399                    t,
1400                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1401                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1402                    pieceIndex, (int)peer->strikes + 1 );
1403                addStrike( t, peer );
1404            }
1405        }
1406    }
1407}
1408
1409int
1410tr_pexCompare( const void * va,
1411               const void * vb )
1412{
1413    const tr_pex * a = va;
1414    const tr_pex * b = vb;
1415    int            i =
1416        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1417
1418    if( i ) return i;
1419    if( a->port < b->port ) return -1;
1420    if( a->port > b->port ) return 1;
1421    return 0;
1422}
1423
1424int tr_pexCompare( const void * a,
1425                   const void * b );
1426
1427static int
1428peerPrefersCrypto( const tr_peer * peer )
1429{
1430    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1431        return TRUE;
1432
1433    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1434        return FALSE;
1435
1436    return tr_peerIoIsEncrypted( peer->io );
1437}
1438
1439int
1440tr_peerMgrGetPeers( tr_peerMgr *    manager,
1441                    const uint8_t * torrentHash,
1442                    tr_pex **       setme_pex )
1443{
1444    int peerCount = 0;
1445    const Torrent *  t;
1446
1447    managerLock( manager );
1448
1449    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1450    if( !t )
1451    {
1452        *setme_pex = NULL;
1453    }
1454    else
1455    {
1456        int i;
1457        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1458        tr_pex * pex = tr_new( tr_pex, peerCount );
1459        tr_pex * walk = pex;
1460
1461        for( i = 0; i < peerCount; ++i, ++walk )
1462        {
1463            const tr_peer * peer = peers[i];
1464            walk->in_addr = peer->in_addr;
1465            walk->port = peer->port;
1466            walk->flags = 0;
1467            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1468            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1469        }
1470
1471        assert( ( walk - pex ) == peerCount );
1472        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1473        *setme_pex = pex;
1474    }
1475
1476    managerUnlock( manager );
1477    return peerCount;
1478}
1479
1480static int reconnectPulse( void * vtorrent );
1481
1482static int rechokePulse( void * vtorrent );
1483
1484void
1485tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1486                        const uint8_t * torrentHash )
1487{
1488    Torrent * t;
1489
1490    managerLock( manager );
1491
1492    t = getExistingTorrent( manager, torrentHash );
1493
1494    assert( t );
1495    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1496    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1497
1498    if( !t->isRunning )
1499    {
1500        t->isRunning = 1;
1501
1502        t->reconnectTimer = tr_timerNew( t->manager->session,
1503                                         reconnectPulse, t,
1504                                         RECONNECT_PERIOD_MSEC );
1505
1506        t->rechokeTimer = tr_timerNew( t->manager->session,
1507                                       rechokePulse, t,
1508                                       RECHOKE_PERIOD_MSEC );
1509
1510        reconnectPulse( t );
1511
1512        rechokePulse( t );
1513
1514        if( !tr_ptrArrayEmpty( t->webseeds ) )
1515            refillSoon( t );
1516    }
1517
1518    managerUnlock( manager );
1519}
1520
1521static void
1522stopTorrent( Torrent * t )
1523{
1524    assert( torrentIsLocked( t ) );
1525
1526    t->isRunning = 0;
1527    tr_timerFree( &t->rechokeTimer );
1528    tr_timerFree( &t->reconnectTimer );
1529
1530    /* disconnect the peers. */
1531    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1532    tr_ptrArrayClear( t->peers );
1533
1534    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1535     * which removes the handshake from t->outgoingHandshakes... */
1536    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1537        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1538}
1539
1540void
1541tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1542                       const uint8_t * torrentHash )
1543{
1544    managerLock( manager );
1545
1546    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1547
1548    managerUnlock( manager );
1549}
1550
1551void
1552tr_peerMgrAddTorrent( tr_peerMgr * manager,
1553                      tr_torrent * tor )
1554{
1555    Torrent * t;
1556
1557    managerLock( manager );
1558
1559    assert( tor );
1560    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1561
1562    t = torrentConstructor( manager, tor );
1563    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1564
1565    managerUnlock( manager );
1566}
1567
1568void
1569tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1570                         const uint8_t * torrentHash )
1571{
1572    Torrent * t;
1573
1574    managerLock( manager );
1575
1576    t = getExistingTorrent( manager, torrentHash );
1577    assert( t );
1578    stopTorrent( t );
1579    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1580    torrentDestructor( t );
1581
1582    managerUnlock( manager );
1583}
1584
1585void
1586tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1587                               const uint8_t *    torrentHash,
1588                               int8_t *           tab,
1589                               unsigned int       tabCount )
1590{
1591    tr_piece_index_t   i;
1592    const Torrent *    t;
1593    const tr_torrent * tor;
1594    float              interval;
1595    int                isComplete;
1596    int                peerCount;
1597    const tr_peer **   peers;
1598
1599    managerLock( manager );
1600
1601    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1602    tor = t->tor;
1603    interval = tor->info.pieceCount / (float)tabCount;
1604    isComplete = tor
1605                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1606    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1607
1608    memset( tab, 0, tabCount );
1609
1610    for( i = 0; tor && i < tabCount; ++i )
1611    {
1612        const int piece = i * interval;
1613
1614        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1615            tab[i] = -1;
1616        else if( peerCount )
1617        {
1618            int j;
1619            for( j = 0; j < peerCount; ++j )
1620                if( tr_bitfieldHas( peers[j]->have, i ) )
1621                    ++tab[i];
1622        }
1623    }
1624
1625    managerUnlock( manager );
1626}
1627
1628/* Returns the pieces that are available from peers */
1629tr_bitfield*
1630tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1631                        const uint8_t *    torrentHash )
1632{
1633    int           i, size;
1634    Torrent *     t;
1635    tr_peer **    peers;
1636    tr_bitfield * pieces;
1637
1638    managerLock( manager );
1639
1640    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1641    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1642    peers = getConnectedPeers( t, &size );
1643    for( i = 0; i < size; ++i )
1644        tr_bitfieldOr( pieces, peers[i]->have );
1645
1646    managerUnlock( manager );
1647    tr_free( peers );
1648    return pieces;
1649}
1650
1651int
1652tr_peerMgrHasConnections( const tr_peerMgr * manager,
1653                          const uint8_t *    torrentHash )
1654{
1655    int             ret;
1656    const Torrent * t;
1657
1658    managerLock( manager );
1659
1660    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1661    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1662               || !tr_ptrArrayEmpty( t->webseeds ) );
1663
1664    managerUnlock( manager );
1665    return ret;
1666}
1667
1668void
1669tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1670                        const uint8_t *    torrentHash,
1671                        int *              setmePeersKnown,
1672                        int *              setmePeersConnected,
1673                        int *              setmeSeedsConnected,
1674                        int *              setmeWebseedsSendingToUs,
1675                        int *              setmePeersSendingToUs,
1676                        int *              setmePeersGettingFromUs,
1677                        int *              setmePeersFrom )
1678{
1679    int                 i, size;
1680    const Torrent *     t;
1681    const tr_peer **    peers;
1682    const tr_webseed ** webseeds;
1683
1684    managerLock( manager );
1685
1686    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1687    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1688
1689    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1690    *setmePeersConnected       = 0;
1691    *setmeSeedsConnected       = 0;
1692    *setmePeersGettingFromUs   = 0;
1693    *setmePeersSendingToUs     = 0;
1694    *setmeWebseedsSendingToUs  = 0;
1695
1696    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1697        setmePeersFrom[i] = 0;
1698
1699    for( i = 0; i < size; ++i )
1700    {
1701        const tr_peer *          peer = peers[i];
1702        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1703
1704        if( peer->io == NULL ) /* not connected */
1705            continue;
1706
1707        ++ * setmePeersConnected;
1708
1709        ++setmePeersFrom[atom->from];
1710
1711        if( clientIsDownloadingFrom( peer ) )
1712            ++ * setmePeersSendingToUs;
1713
1714        if( clientIsUploadingTo( peer ) )
1715            ++ * setmePeersGettingFromUs;
1716
1717        if( atom->flags & ADDED_F_SEED_FLAG )
1718            ++ * setmeSeedsConnected;
1719    }
1720
1721    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1722    for( i = 0; i < size; ++i )
1723    {
1724        if( tr_webseedIsActive( webseeds[i] ) )
1725            ++ * setmeWebseedsSendingToUs;
1726    }
1727
1728    managerUnlock( manager );
1729}
1730
1731float*
1732tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1733                     const uint8_t *    torrentHash )
1734{
1735    const Torrent *     t;
1736    const tr_webseed ** webseeds;
1737    int                 i;
1738    int                 webseedCount;
1739    float *             ret;
1740
1741    assert( manager );
1742    managerLock( manager );
1743
1744    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1745    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1746                                                     &webseedCount );
1747    assert( webseedCount == t->tor->info.webseedCount );
1748    ret = tr_new0( float, webseedCount );
1749
1750    for( i = 0; i < webseedCount; ++i )
1751        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1752            ret[i] = -1.0;
1753
1754    managerUnlock( manager );
1755    return ret;
1756}
1757
1758double
1759tr_peerGetPieceSpeed( const tr_peer    * peer,
1760                      tr_direction       direction )
1761{
1762    assert( peer );
1763    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1764
1765    return tr_rcRate( peer->pieceSpeed[direction] );
1766}
1767
1768
1769struct tr_peer_stat *
1770tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1771                     const   uint8_t     * torrentHash,
1772                     int                 * setmeCount UNUSED )
1773{
1774    int             i, size;
1775    const Torrent * t;
1776    tr_peer **      peers;
1777    tr_peer_stat *  ret;
1778
1779    assert( manager );
1780    managerLock( manager );
1781
1782    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1783    peers = getConnectedPeers( (Torrent*)t, &size );
1784    ret = tr_new0( tr_peer_stat, size );
1785
1786    for( i = 0; i < size; ++i )
1787    {
1788        char *                   pch;
1789        const tr_peer *          peer = peers[i];
1790        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1791        tr_peer_stat *           stat = ret + i;
1792
1793        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1794        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1795                   sizeof( stat->client ) );
1796        stat->port               = peer->port;
1797        stat->from               = atom->from;
1798        stat->progress           = peer->progress;
1799        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1800        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1801        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1802        stat->peerIsChoked       = peer->peerIsChoked;
1803        stat->peerIsInterested   = peer->peerIsInterested;
1804        stat->clientIsChoked     = peer->clientIsChoked;
1805        stat->clientIsInterested = peer->clientIsInterested;
1806        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1807        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1808        stat->isUploadingTo      = clientIsUploadingTo( peer );
1809
1810        pch = stat->flagStr;
1811        if( t->optimistic == peer ) *pch++ = 'O';
1812        if( stat->isDownloadingFrom ) *pch++ = 'D';
1813        else if( stat->clientIsInterested ) *pch++ = 'd';
1814        if( stat->isUploadingTo ) *pch++ = 'U';
1815        else if( stat->peerIsInterested ) *pch++ = 'u';
1816        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1817                'K';
1818        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1819        if( stat->isEncrypted ) *pch++ = 'E';
1820        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1821        if( stat->isIncoming ) *pch++ = 'I';
1822        *pch = '\0';
1823    }
1824
1825    *setmeCount = size;
1826    tr_free( peers );
1827
1828    managerUnlock( manager );
1829    return ret;
1830}
1831
1832/**
1833***
1834**/
1835
1836struct ChokeData
1837{
1838    unsigned int    doUnchoke    : 1;
1839    unsigned int    isInterested : 1;
1840    double          rateToClient;
1841    double          rateToPeer;
1842    tr_peer *       peer;
1843};
1844
1845static int
1846tr_compareDouble( double a,
1847                  double b )
1848{
1849    if( a < b ) return -1;
1850    if( a > b ) return 1;
1851    return 0;
1852}
1853
1854static int
1855compareChoke( const void * va,
1856              const void * vb )
1857{
1858    const struct ChokeData * a = va;
1859    const struct ChokeData * b = vb;
1860    int                      diff = 0;
1861
1862    if( diff == 0 ) /* prefer higher dl speeds */
1863        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1864    if( diff == 0 ) /* prefer higher ul speeds */
1865        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1866    if( diff == 0 ) /* prefer unchoked */
1867        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1868
1869    return diff;
1870}
1871
1872static int
1873isNew( const tr_peer * peer )
1874{
1875    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1876}
1877
1878static int
1879isSame( const tr_peer * peer )
1880{
1881    return peer && peer->client && strstr( peer->client, "Transmission" );
1882}
1883
1884/**
1885***
1886**/
1887
1888static void
1889rechoke( Torrent * t )
1890{
1891    int                i, peerCount, size, unchokedInterested;
1892    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1893    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1894    const int          chokeAll = !tr_torrentPieceTransferIsAllowed( t->tor, TR_CLIENT_TO_PEER );
1895
1896    assert( torrentIsLocked( t ) );
1897
1898    /* sort the peers by preference and rate */
1899    for( i = 0, size = 0; i < peerCount; ++i )
1900    {
1901        tr_peer * peer = peers[i];
1902        if( peer->progress >= 1.0 ) /* choke all seeds */
1903            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1904        else if( chokeAll )
1905            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1906        else {
1907            struct ChokeData * n = &choke[size++];
1908            n->peer         = peer;
1909            n->isInterested = peer->peerIsInterested;
1910            n->rateToPeer   = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1911            n->rateToClient = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1912        }
1913    }
1914
1915    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1916
1917    /**
1918     * Reciprocation and number of uploads capping is managed by unchoking
1919     * the N peers which have the best upload rate and are interested.
1920     * This maximizes the client's download rate. These N peers are
1921     * referred to as downloaders, because they are interested in downloading
1922     * from the client.
1923     *
1924     * Peers which have a better upload rate (as compared to the downloaders)
1925     * but aren't interested get unchoked. If they become interested, the
1926     * downloader with the worst upload rate gets choked. If a client has
1927     * a complete file, it uses its upload rate rather than its download
1928     * rate to decide which peers to unchoke.
1929     */
1930    unchokedInterested = 0;
1931    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1932    {
1933        choke[i].doUnchoke = 1;
1934        if( choke[i].isInterested )
1935            ++unchokedInterested;
1936    }
1937
1938    /* optimistic unchoke */
1939    if( i < size )
1940    {
1941        int                n;
1942        struct ChokeData * c;
1943        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1944
1945        for( ; i < size; ++i )
1946        {
1947            if( choke[i].isInterested )
1948            {
1949                const tr_peer * peer = choke[i].peer;
1950                int             x = 1, y;
1951                if( isNew( peer ) ) x *= 3;
1952                if( isSame( peer ) ) x *= 3;
1953                for( y = 0; y < x; ++y )
1954                    tr_ptrArrayAppend( randPool, &choke[i] );
1955            }
1956        }
1957
1958        if( ( n = tr_ptrArraySize( randPool ) ) )
1959        {
1960            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1961            c->doUnchoke = 1;
1962            t->optimistic = c->peer;
1963        }
1964
1965        tr_ptrArrayFree( randPool, NULL );
1966    }
1967
1968    for( i = 0; i < size; ++i )
1969        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1970
1971    /* cleanup */
1972    tr_free( choke );
1973    tr_free( peers );
1974}
1975
1976static int
1977rechokePulse( void * vtorrent )
1978{
1979    Torrent * t = vtorrent;
1980
1981    torrentLock( t );
1982    rechoke( t );
1983    torrentUnlock( t );
1984    return TRUE;
1985}
1986
1987/***
1988****
1989****  Life and Death
1990****
1991***/
1992
1993static int
1994shouldPeerBeClosed( const Torrent * t,
1995                    const tr_peer * peer,
1996                    int             peerCount )
1997{
1998    const tr_torrent *       tor = t->tor;
1999    const time_t             now = time( NULL );
2000    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2001
2002    /* if it's marked for purging, close it */
2003    if( peer->doPurge )
2004    {
2005        tordbg( t, "purging peer %s because its doPurge flag is set",
2006               tr_peerIoAddrStr( &atom->addr,
2007                                 atom->port ) );
2008        return TRUE;
2009    }
2010
2011    /* if we're seeding and the peer has everything we have,
2012     * and enough time has passed for a pex exchange, then disconnect */
2013    if( tr_torrentIsSeed( tor ) )
2014    {
2015        int peerHasEverything;
2016        if( atom->flags & ADDED_F_SEED_FLAG )
2017            peerHasEverything = TRUE;
2018        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2019            peerHasEverything = FALSE;
2020        else
2021        {
2022            tr_bitfield * tmp =
2023                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2024            tr_bitfieldDifference( tmp, peer->have );
2025            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2026            tr_bitfieldFree( tmp );
2027        }
2028        if( peerHasEverything
2029          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2030        {
2031            tordbg( t, "purging peer %s because we're both seeds",
2032                   tr_peerIoAddrStr( &atom->addr,
2033                                     atom->port ) );
2034            return TRUE;
2035        }
2036    }
2037
2038    /* disconnect if it's been too long since piece data has been transferred.
2039     * this is on a sliding scale based on number of available peers... */
2040    {
2041        const int    relaxStrictnessIfFewerThanN =
2042            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2043        /* if we have >= relaxIfFewerThan, strictness is 100%.
2044         * if we have zero connections, strictness is 0% */
2045        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2046                                  ? 1.0
2047                                  : peerCount /
2048                                  (float)relaxStrictnessIfFewerThanN;
2049        const int    lo = MIN_UPLOAD_IDLE_SECS;
2050        const int    hi = MAX_UPLOAD_IDLE_SECS;
2051        const int    limit = lo + ( ( hi - lo ) * strictness );
2052        const time_t then = peer->pieceDataActivityDate;
2053        const int    idleTime = then ? ( now - then ) : 0;
2054        if( idleTime > limit )
2055        {
2056            tordbg(
2057                t,
2058                "purging peer %s because it's been %d secs since we shared anything",
2059                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2060            return TRUE;
2061        }
2062    }
2063
2064    return FALSE;
2065}
2066
2067static tr_peer **
2068getPeersToClose( Torrent * t,
2069                 int *     setmeSize )
2070{
2071    int               i, peerCount, outsize;
2072    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2073                                                           &peerCount );
2074    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2075
2076    assert( torrentIsLocked( t ) );
2077
2078    for( i = outsize = 0; i < peerCount; ++i )
2079        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2080            ret[outsize++] = peers[i];
2081
2082    *setmeSize = outsize;
2083    return ret;
2084}
2085
2086static int
2087compareCandidates( const void * va,
2088                   const void * vb )
2089{
2090    const struct peer_atom * a = *(const struct peer_atom**) va;
2091    const struct peer_atom * b = *(const struct peer_atom**) vb;
2092
2093    /* <Charles> Here we would probably want to try reconnecting to
2094     * peers that had most recently given us data. Lots of users have
2095     * trouble with resets due to their routers and/or ISPs. This way we
2096     * can quickly recover from an unwanted reset. So we sort
2097     * piece_data_time in descending order.
2098     */
2099
2100    if( a->piece_data_time != b->piece_data_time )
2101        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2102
2103    if( a->numFails != b->numFails )
2104        return a->numFails < b->numFails ? -1 : 1;
2105
2106    if( a->time != b->time )
2107        return a->time < b->time ? -1 : 1;
2108
2109    return 0;
2110}
2111
2112static int
2113getReconnectIntervalSecs( const struct peer_atom * atom )
2114{
2115    int          sec;
2116    const time_t now = time( NULL );
2117
2118    /* if we were recently connected to this peer and transferring piece
2119     * data, try to reconnect to them sooner rather that later -- we don't
2120     * want network troubles to get in the way of a good peer. */
2121    if( ( now - atom->piece_data_time ) <=
2122       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2123        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2124
2125    /* don't allow reconnects more often than our minimum */
2126    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2127        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2128
2129    /* otherwise, the interval depends on how many times we've tried
2130     * and failed to connect to the peer */
2131    else switch( atom->numFails )
2132        {
2133            case 0:
2134                sec = 0; break;
2135
2136            case 1:
2137                sec = 5; break;
2138
2139            case 2:
2140                sec = 2 * 60; break;
2141
2142            case 3:
2143                sec = 15 * 60; break;
2144
2145            case 4:
2146                sec = 30 * 60; break;
2147
2148            case 5:
2149                sec = 60 * 60; break;
2150
2151            default:
2152                sec = 120 * 60; break;
2153        }
2154
2155    return sec;
2156}
2157
2158static struct peer_atom **
2159getPeerCandidates(                               Torrent * t,
2160                                           int * setmeSize )
2161{
2162    int                 i, atomCount, retCount;
2163    struct peer_atom ** atoms;
2164    struct peer_atom ** ret;
2165    const time_t        now = time( NULL );
2166    const int           seed = tr_torrentIsSeed( t->tor );
2167
2168    assert( torrentIsLocked( t ) );
2169
2170    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2171    ret = tr_new( struct peer_atom*, atomCount );
2172    for( i = retCount = 0; i < atomCount; ++i )
2173    {
2174        int                interval;
2175        struct peer_atom * atom = atoms[i];
2176
2177        /* peer fed us too much bad data ... we only keep it around
2178         * now to weed it out in case someone sends it to us via pex */
2179        if( atom->myflags & MYFLAG_BANNED )
2180            continue;
2181
2182        /* peer was unconnectable before, so we're not going to keep trying.
2183         * this is needs a separate flag from `banned', since if they try
2184         * to connect to us later, we'll let them in */
2185        if( atom->myflags & MYFLAG_UNREACHABLE )
2186            continue;
2187
2188        /* we don't need two connections to the same peer... */
2189        if( peerIsInUse( t, &atom->addr ) )
2190            continue;
2191
2192        /* no need to connect if we're both seeds... */
2193        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2194            continue;
2195
2196        /* don't reconnect too often */
2197        interval = getReconnectIntervalSecs( atom );
2198        if( ( now - atom->time ) < interval )
2199        {
2200            tordbg(
2201                t,
2202                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2203                i, tr_peerIoAddrStr( &atom->addr,
2204                                     atom->port ), interval );
2205            continue;
2206        }
2207
2208        /* Don't connect to peers in our blocklist */
2209        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2210            continue;
2211
2212        ret[retCount++] = atom;
2213    }
2214
2215    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2216    *setmeSize = retCount;
2217    return ret;
2218}
2219
2220static int
2221reconnectPulse( void * vtorrent )
2222{
2223    Torrent *     t = vtorrent;
2224    static time_t prevTime = 0;
2225    static int    newConnectionsThisSecond = 0;
2226    time_t        now;
2227
2228    torrentLock( t );
2229
2230    now = time( NULL );
2231    if( prevTime != now )
2232    {
2233        prevTime = now;
2234        newConnectionsThisSecond = 0;
2235    }
2236
2237    if( !t->isRunning )
2238    {
2239        removeAllPeers( t );
2240    }
2241    else
2242    {
2243        int                 i, nCandidates, nBad;
2244        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2245        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2246
2247        if( nBad || nCandidates )
2248            tordbg(
2249                t, "reconnect pulse for [%s]: %d bad connections, "
2250                   "%d connection candidates, %d atoms, max per pulse is %d",
2251                t->tor->info.name, nBad, nCandidates,
2252                tr_ptrArraySize( t->pool ),
2253                (int)MAX_RECONNECTIONS_PER_PULSE );
2254
2255        /* disconnect some peers.
2256           if we transferred piece data, then they might be good peers,
2257           so reset their `numFails' weight to zero.  otherwise we connected
2258           to them fruitlessly, so mark it as another fail */
2259        for( i = 0; i < nBad; ++i )
2260        {
2261            tr_peer *          peer = connections[i];
2262            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2263            if( peer->pieceDataActivityDate )
2264                atom->numFails = 0;
2265            else
2266                ++atom->numFails;
2267            tordbg( t, "removing bad peer %s",
2268                   tr_peerIoGetAddrStr( peer->io ) );
2269            removePeer( t, peer );
2270        }
2271
2272        /* add some new ones */
2273        for( i = 0;    ( i < nCandidates )
2274           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2275           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2276           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2277             ++i )
2278        {
2279            tr_peerMgr *       mgr = t->manager;
2280            struct peer_atom * atom = candidates[i];
2281            tr_peerIo *        io;
2282
2283            tordbg( t, "Starting an OUTGOING connection with %s",
2284                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2285
2286            io =
2287                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2288                                      t->hash );
2289            if( io == NULL )
2290            {
2291                atom->myflags |= MYFLAG_UNREACHABLE;
2292            }
2293            else
2294            {
2295                tr_handshake * handshake = tr_handshakeNew(
2296                    io,
2297                    mgr->session->
2298                    encryptionMode,
2299                    myHandshakeDoneCB,
2300                    mgr );
2301
2302                assert( tr_peerIoGetTorrentHash( io ) );
2303
2304                ++newConnectionsThisSecond;
2305
2306                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2307                                         handshakeCompare );
2308            }
2309
2310            atom->time = time( NULL );
2311        }
2312
2313        /* cleanup */
2314        tr_free( connections );
2315        tr_free( candidates );
2316    }
2317
2318    torrentUnlock( t );
2319    return TRUE;
2320}
2321
2322/****
2323*****
2324*****  BANDWIDTH ALLOCATION
2325*****
2326****/
2327
2328static double
2329allocateHowMuch( double         desiredAvgKB,
2330                 const double * history )
2331{
2332    const double baseline = desiredAvgKB * 1024.0 /
2333                            BANDWIDTH_PULSES_PER_SECOND;
2334    const double min = baseline * 0.85;
2335    const double max = baseline * 1.15;
2336    int          i;
2337    double       usedBytes;
2338    double       n;
2339    double       clamped;
2340
2341    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2342        usedBytes += history[i];
2343
2344    n = ( desiredAvgKB * 1024.0 )
2345      * ( BANDWIDTH_PULSE_HISTORY + 1.0 )
2346      / BANDWIDTH_PULSES_PER_SECOND
2347      - usedBytes;
2348
2349    /* clamp the return value to lessen oscillation */
2350    clamped = n;
2351    clamped = MAX( clamped, min );
2352    clamped = MIN( clamped, max );
2353/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2354  (%.2f)\n", desiredAvgKB,
2355  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2356  clamped/1024.0, n/1024.0 );*/
2357    return clamped;
2358}
2359
2360/**
2361 * Distributes a fixed amount of bandwidth among a set of peers.
2362 *
2363 * @param peerArray peers whose client-to-peer bandwidth will be set
2364 * @param direction whether to allocate upload or download bandwidth
2365 * @param history recent bandwidth history for these peers
2366 * @param desiredAvgKB overall bandwidth goal for this set of peers
2367 */
2368static void
2369setPeerBandwidth( tr_ptrArray *      peerArray,
2370                  const tr_direction direction,
2371                  const double *     history,
2372                  double             desiredAvgKB )
2373{
2374    const int    peerCount = tr_ptrArraySize( peerArray );
2375    const double bytes = allocateHowMuch( desiredAvgKB, history );
2376    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2377    const double meritBytes = MAX( 0, bytes - welfareBytes );
2378    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2379    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2380    int          i;
2381    int          candidateCount;
2382    double       welfare;
2383    size_t       bytesUsed;
2384
2385    assert( meritBytes >= 0.0 );
2386    assert( welfareBytes >= 0.0 );
2387    assert( direction == TR_UP || direction == TR_DOWN );
2388
2389    for( i = candidateCount = 0; i < peerCount; ++i )
2390        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2391            candidates[candidateCount++] = peers[i];
2392        else
2393            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2394
2395    for( i = bytesUsed = 0; i < candidateCount; ++i )
2396        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2397                                                direction );
2398
2399    welfare = welfareBytes / candidateCount;
2400
2401    for( i = 0; i < candidateCount; ++i )
2402    {
2403        tr_peer *    peer = candidates[i];
2404        const double merit = bytesUsed
2405                             ? ( meritBytes *
2406                                tr_peerIoGetBandwidthUsed( peer->io,
2407                                                           direction ) ) /
2408                             bytesUsed
2409                             : ( meritBytes / candidateCount );
2410        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2411    }
2412
2413    /* cleanup */
2414    tr_free( candidates );
2415}
2416
2417static size_t
2418countHandshakeBandwidth( tr_ptrArray * handshakes,
2419                         tr_direction  direction )
2420{
2421    const int n = tr_ptrArraySize( handshakes );
2422    int       i;
2423    size_t    total;
2424
2425    for( i = total = 0; i < n; ++i )
2426    {
2427        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2428        total += tr_peerIoGetBandwidthUsed( io, direction );
2429    }
2430    return total;
2431}
2432
2433static size_t
2434countPeerBandwidth( tr_ptrArray * peers,
2435                    tr_direction  direction )
2436{
2437    const int n = tr_ptrArraySize( peers );
2438    int       i;
2439    size_t    total;
2440
2441    for( i = total = 0; i < n; ++i )
2442    {
2443        tr_peer * peer = tr_ptrArrayNth( peers, i );
2444        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2445    }
2446    return total;
2447}
2448
2449static void
2450givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2451                             tr_direction  direction )
2452{
2453    const int n = tr_ptrArraySize( peers );
2454    int       i;
2455
2456    for( i = 0; i < n; ++i )
2457    {
2458        tr_peer * peer = tr_ptrArrayNth( peers, i );
2459        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2460    }
2461}
2462
2463static void
2464pumpAllPeers( tr_peerMgr * mgr )
2465{
2466    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2467    int       i, j;
2468
2469    for( i = 0; i < torrentCount; ++i )
2470    {
2471        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2472        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2473        {
2474            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2475            tr_peerMsgsPulse( peer->msgs );
2476        }
2477    }
2478}
2479
2480/**
2481 * Allocate bandwidth for each peer connection.
2482 *
2483 * @param mgr the peer manager
2484 * @param direction whether to allocate upload or download bandwidth
2485 * @return the amount of directional bandwidth used since the last pulse.
2486 */
2487static double
2488allocateBandwidth( tr_peerMgr * mgr,
2489                   tr_direction direction )
2490{
2491    tr_session *  session = mgr->session;
2492    const int     pulseNumber = mgr->bandwidthPulseNumber;
2493    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2494    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2495    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2496    double        allBytesUsed = 0;
2497    size_t        poolBytesUsed = 0;
2498    int           i;
2499
2500    assert( mgr );
2501    assert( direction == TR_UP || direction == TR_DOWN );
2502
2503    /* before allocating bandwidth, pump the connected peers */
2504    pumpAllPeers( mgr );
2505
2506    for( i = 0; i < torrentCount; ++i )
2507    {
2508        Torrent * t = torrents[i];
2509        const size_t used = countPeerBandwidth( t->peers, direction );
2510        tr_speedlimit speedMode;
2511
2512        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2513
2514        /* remember this torrent's bytes used */
2515        t->tor->rateHistory[direction][pulseNumber] = used;
2516
2517        /* add this torrent's bandwidth use to allBytesUsed */
2518        allBytesUsed += used;
2519
2520        /* if piece data is disallowed, don't bother limiting bandwidth --
2521         * we won't be asking for, or sending out, any pieces */
2522        if( !tr_torrentPieceTransferIsAllowed( t->tor, direction ) )
2523            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2524        else
2525            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2526           
2527        /* process the torrent's peers based on its speed mode */
2528        switch( speedMode )
2529        {
2530            case TR_SPEEDLIMIT_UNLIMITED:
2531                givePeersUnlimitedBandwidth( t->peers, direction );
2532                break;
2533
2534            case TR_SPEEDLIMIT_SINGLE:
2535                setPeerBandwidth( t->peers, direction,
2536                                  t->tor->rateHistory[direction],
2537                                  tr_torrentGetSpeedLimit( t->tor,
2538                                                           direction ) );
2539                break;
2540
2541            case TR_SPEEDLIMIT_GLOBAL:
2542            {
2543                int       i;
2544                const int n = tr_ptrArraySize( t->peers );
2545                for( i = 0; i < n; ++i )
2546                    tr_ptrArrayAppend( globalPool,
2547                                      tr_ptrArrayNth( t->peers, i ) );
2548                poolBytesUsed += used;
2549                break;
2550            }
2551        }
2552    }
2553
2554    /* add incoming handshakes to the global pool */
2555    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2556    allBytesUsed += i;
2557    poolBytesUsed += i;
2558
2559    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2560
2561    /* handle the global pool's connections */
2562    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2563        givePeersUnlimitedBandwidth( globalPool, direction );
2564    else
2565        setPeerBandwidth( globalPool, direction,
2566                         mgr->globalPoolHistory[direction],
2567                         tr_sessionGetSpeedLimit( session, direction ) );
2568
2569    /* now that we've allocated bandwidth, pump all the connected peers */
2570    pumpAllPeers( mgr );
2571
2572    /* cleanup */
2573    tr_ptrArrayFree( globalPool, NULL );
2574    return allBytesUsed;
2575}
2576
2577static int
2578bandwidthPulse( void * vmgr )
2579{
2580    tr_peerMgr * mgr = vmgr;
2581    int          i;
2582
2583    managerLock( mgr );
2584
2585    /* keep track of how far we are into the cycle */
2586    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2587        mgr->bandwidthPulseNumber = 0;
2588
2589    /* allocate the upload and download bandwidth */
2590    for( i = 0; i < 2; ++i )
2591        allocateBandwidth( mgr, i );
2592
2593    managerUnlock( mgr );
2594    return TRUE;
2595}
2596
Note: See TracBrowser for help on using the repository browser.