source: trunk/libtransmission/peer-mgr.c @ 6954

Last change on this file since 6954 was 6954, checked in by charles, 13 years ago

new & improved fix for #617: Transmission goes above the set bandwidth limits

  • Property svn:keywords set to Date Rev Author Id
File size: 72.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6954 2008-10-25 02:20:16Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "stats.h" /* tr_statsAddDownloaded */
35#include "torrent.h"
36#include "trevent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to change which peers are choked */
43    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
44
45    /* minimum interval for refilling peers' request lists */
46    REFILL_PERIOD_MSEC = 333,
47
48    /* when many peers are available, keep idle ones this long */
49    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
50
51    /* when few peers are available, keep idle ones this long */
52    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
56
57    /* max # of peers to ask fer per torrent per reconnect pulse */
58    MAX_RECONNECTIONS_PER_PULSE = 2,
59
60    /* max number of peers to ask for per second overall.
61    * this throttle is to avoid overloading the router */
62    MAX_CONNECTIONS_PER_SECOND = 4,
63
64    /* number of unchoked peers per torrent.
65     * FIXME: this probably ought to be configurable */
66    MAX_UNCHOKED_PEERS = 12,
67
68    /* number of bad pieces a peer is allowed to send before we ban them */
69    MAX_BAD_PIECES_PER_PEER = 3,
70
71    /* use for bitwise operations w/peer_atom.myflags */
72    MYFLAG_BANNED = 1,
73
74    /* unreachable for now... but not banned.
75     * if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2,
77
78    /* the minimum we'll wait before attempting to reconnect to a peer */
79    MINIMUM_RECONNECT_INTERVAL_SECS = 5
80};
81
82
83/**
84***
85**/
86
87/* We keep one of these for every peer we know about, whether
88 * it's connected or not, so the struct must be small.
89 * When our current connections underperform, we dip back
90 * into this list for new ones. */
91struct peer_atom
92{
93    uint8_t           from;
94    uint8_t           flags; /* these match the added_f flags */
95    uint8_t           myflags; /* flags that aren't defined in added_f */
96    uint16_t          port;
97    uint16_t          numFails;
98    struct in_addr    addr;
99    time_t            time; /* when the peer's connection status last changed */
100    time_t            piece_data_time;
101};
102
103typedef struct
104{
105    unsigned int    isRunning : 1;
106
107    uint8_t         hash[SHA_DIGEST_LENGTH];
108    int         *   pendingRequestCount;
109    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
110    tr_ptrArray *   pool; /* struct peer_atom */
111    tr_ptrArray *   peers; /* tr_peer */
112    tr_ptrArray *   webseeds; /* tr_webseed */
113    tr_timer *      reconnectTimer;
114    tr_timer *      rechokeTimer;
115    tr_timer *      refillTimer;
116    tr_torrent *    tor;
117    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
118
119    struct tr_peerMgr * manager;
120}
121Torrent;
122
123struct tr_peerMgr
124{
125    uint8_t        bandwidthPulseNumber;
126    tr_session *   session;
127    tr_ptrArray *  torrents; /* Torrent */
128    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
129    tr_timer *     bandwidthTimer;
130    double         rateHistory[2][BANDWIDTH_PULSE_HISTORY];
131    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
132};
133
134#define tordbg( t, ... ) \
135    tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ )
136
137#define dbgmsg( ... ) \
138    tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ )
139
140/**
141***
142**/
143
144static void
145managerLock( const struct tr_peerMgr * manager )
146{
147    tr_globalLock( manager->session );
148}
149
150static void
151managerUnlock( const struct tr_peerMgr * manager )
152{
153    tr_globalUnlock( manager->session );
154}
155
156static void
157torrentLock( Torrent * torrent )
158{
159    managerLock( torrent->manager );
160}
161
162static void
163torrentUnlock( Torrent * torrent )
164{
165    managerUnlock( torrent->manager );
166}
167
168static int
169torrentIsLocked( const Torrent * t )
170{
171    return tr_globalIsLocked( t->manager->session );
172}
173
174/**
175***
176**/
177
178static int
179compareAddresses( const struct in_addr * a,
180                  const struct in_addr * b )
181{
182    if( a->s_addr != b->s_addr )
183        return a->s_addr < b->s_addr ? -1 : 1;
184
185    return 0;
186}
187
188static int
189handshakeCompareToAddr( const void * va,
190                        const void * vb )
191{
192    const tr_handshake * a = va;
193
194    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
195}
196
197static int
198handshakeCompare( const void * a,
199                  const void * b )
200{
201    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
202}
203
204static tr_handshake*
205getExistingHandshake( tr_ptrArray *          handshakes,
206                      const struct in_addr * in_addr )
207{
208    return tr_ptrArrayFindSorted( handshakes,
209                                  in_addr,
210                                  handshakeCompareToAddr );
211}
212
213static int
214comparePeerAtomToAddress( const void * va,
215                          const void * vb )
216{
217    const struct peer_atom * a = va;
218
219    return compareAddresses( &a->addr, vb );
220}
221
222static int
223comparePeerAtoms( const void * va,
224                  const void * vb )
225{
226    const struct peer_atom * b = vb;
227
228    return comparePeerAtomToAddress( va, &b->addr );
229}
230
231/**
232***
233**/
234
235static int
236torrentCompare( const void * va,
237                const void * vb )
238{
239    const Torrent * a = va;
240    const Torrent * b = vb;
241
242    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
243}
244
245static int
246torrentCompareToHash( const void * va,
247                      const void * vb )
248{
249    const Torrent * a = va;
250    const uint8_t * b_hash = vb;
251
252    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
253}
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
260                                             hash,
261                                             torrentCompareToHash );
262}
263
264static int
265peerCompare( const void * va,
266             const void * vb )
267{
268    const tr_peer * a = va;
269    const tr_peer * b = vb;
270
271    return compareAddresses( &a->in_addr, &b->in_addr );
272}
273
274static int
275peerCompareToAddr( const void * va,
276                   const void * vb )
277{
278    const tr_peer * a = va;
279
280    return compareAddresses( &a->in_addr, vb );
281}
282
283static tr_peer*
284getExistingPeer( Torrent *              torrent,
285                 const struct in_addr * in_addr )
286{
287    assert( torrentIsLocked( torrent ) );
288    assert( in_addr );
289
290    return tr_ptrArrayFindSorted( torrent->peers,
291                                  in_addr,
292                                  peerCompareToAddr );
293}
294
295static struct peer_atom*
296getExistingAtom( const                  Torrent * t,
297                 const struct in_addr * addr )
298{
299    assert( torrentIsLocked( t ) );
300    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
301}
302
303static int
304peerIsInUse( const Torrent *        ct,
305             const struct in_addr * addr )
306{
307    Torrent * t = (Torrent*) ct;
308
309    assert( torrentIsLocked ( t ) );
310
311    return getExistingPeer( t, addr )
312           || getExistingHandshake( t->outgoingHandshakes, addr )
313           || getExistingHandshake( t->manager->incomingHandshakes, addr );
314}
315
316static tr_peer*
317peerConstructor( const struct in_addr * in_addr )
318{
319    tr_peer * p;
320
321    p = tr_new0( tr_peer, 1 );
322    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
323    return p;
324}
325
326static tr_peer*
327getPeer( Torrent *              torrent,
328         const struct in_addr * in_addr )
329{
330    tr_peer * peer;
331
332    assert( torrentIsLocked( torrent ) );
333
334    peer = getExistingPeer( torrent, in_addr );
335
336    if( peer == NULL )
337    {
338        peer = peerConstructor( in_addr );
339        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
340    }
341
342    return peer;
343}
344
345static void
346peerDestructor( tr_peer * peer )
347{
348    assert( peer );
349    assert( peer->msgs );
350
351    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
352    tr_peerMsgsFree( peer->msgs );
353
354    tr_peerIoFree( peer->io );
355
356    tr_bitfieldFree( peer->have );
357    tr_bitfieldFree( peer->blame );
358    tr_free( peer->client );
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t,
364            tr_peer * peer )
365{
366    tr_peer *          removed;
367    struct peer_atom * atom;
368
369    assert( torrentIsLocked( t ) );
370
371    atom = getExistingAtom( t, &peer->in_addr );
372    assert( atom );
373    atom->time = time( NULL );
374
375    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
376    assert( removed == peer );
377    peerDestructor( removed );
378}
379
380static void
381removeAllPeers( Torrent * t )
382{
383    while( !tr_ptrArrayEmpty( t->peers ) )
384        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
385}
386
387static void
388torrentDestructor( void * vt )
389{
390    Torrent * t = vt;
391    uint8_t   hash[SHA_DIGEST_LENGTH];
392
393    assert( t );
394    assert( !t->isRunning );
395    assert( t->peers );
396    assert( torrentIsLocked( t ) );
397    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
398    assert( tr_ptrArrayEmpty( t->peers ) );
399
400    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
401
402    tr_timerFree( &t->reconnectTimer );
403    tr_timerFree( &t->rechokeTimer );
404    tr_timerFree( &t->refillTimer );
405
406    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
407    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
408    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
409    tr_ptrArrayFree( t->peers, NULL );
410
411    tr_free( t->pendingRequestCount );
412    tr_free( t );
413}
414
415static void peerCallbackFunc( void * vpeer,
416                              void * vevent,
417                              void * vt );
418
419static Torrent*
420torrentConstructor( tr_peerMgr * manager,
421                    tr_torrent * tor )
422{
423    int       i;
424    Torrent * t;
425
426    t = tr_new0( Torrent, 1 );
427    t->manager = manager;
428    t->tor = tor;
429    t->pool = tr_ptrArrayNew( );
430    t->peers = tr_ptrArrayNew( );
431    t->webseeds = tr_ptrArrayNew( );
432    t->outgoingHandshakes = tr_ptrArrayNew( );
433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
434
435    for( i = 0; i < tor->info.webseedCount; ++i )
436    {
437        tr_webseed * w =
438            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
439                           t );
440        tr_ptrArrayAppend( t->webseeds, w );
441    }
442
443    return t;
444}
445
446/**
447 * For explanation, see http://www.bittorrent.org/fast_extensions.html
448 * Also see the "test-allowed-set" unit test
449 *
450 * @param k number of pieces in set
451 * @param sz number of pieces in the torrent
452 * @param infohash torrent's SHA1 hash
453 * @param ip peer's address
454 */
455struct tr_bitfield *
456tr_peerMgrGenerateAllowedSet(
457    const uint32_t         k,
458    const uint32_t         sz,
459    const                  uint8_t        *
460                           infohash,
461    const struct in_addr * ip )
462{
463    uint8_t       w[SHA_DIGEST_LENGTH + 4];
464    uint8_t       x[SHA_DIGEST_LENGTH];
465    tr_bitfield * a;
466    uint32_t      a_size;
467
468    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
469    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
470    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
471
472    a = tr_bitfieldNew( sz );
473    a_size = 0;
474
475    while( a_size < k )
476    {
477        int i;
478        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
479        {
480            uint32_t j = i * 4;                                /* (5) */
481            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
482            uint32_t index = y % sz;                           /* (7) */
483            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
484            {
485                tr_bitfieldAdd( a, index );                    /* (9) */
486                ++a_size;
487            }
488        }
489        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
490    }
491
492    return a;
493}
494
495static int bandwidthPulse( void * vmgr );
496
497tr_peerMgr*
498tr_peerMgrNew( tr_session * session )
499{
500    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
501
502    m->session = session;
503    m->torrents = tr_ptrArrayNew( );
504    m->incomingHandshakes = tr_ptrArrayNew( );
505    m->bandwidthPulseNumber = -1;
506    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
507                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
508    return m;
509}
510
511void
512tr_peerMgrFree( tr_peerMgr * manager )
513{
514    managerLock( manager );
515
516    tr_timerFree( &manager->bandwidthTimer );
517
518    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
519     * the item from manager->handshakes, so this is a little roundabout... */
520    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
521        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
522
523    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
524
525    /* free the torrents. */
526    tr_ptrArrayFree( manager->torrents, torrentDestructor );
527
528    managerUnlock( manager );
529    tr_free( manager );
530}
531
532static tr_peer**
533getConnectedPeers( Torrent * t,
534                   int *     setmeCount )
535{
536    int       i, peerCount, connectionCount;
537    tr_peer **peers;
538    tr_peer **ret;
539
540    assert( torrentIsLocked( t ) );
541
542    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
543    ret = tr_new( tr_peer *, peerCount );
544
545    for( i = connectionCount = 0; i < peerCount; ++i )
546        if( peers[i]->msgs )
547            ret[connectionCount++] = peers[i];
548
549    *setmeCount = connectionCount;
550    return ret;
551}
552
553static int
554clientIsDownloadingFrom( const tr_peer * peer )
555{
556    return peer->clientIsInterested && !peer->clientIsChoked;
557}
558
559static int
560clientIsUploadingTo( const tr_peer * peer )
561{
562    return peer->peerIsInterested && !peer->peerIsChoked;
563}
564
565/***
566****
567***/
568
569int
570tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
571                      const uint8_t *        torrentHash,
572                      const struct in_addr * addr )
573{
574    int                      isSeed = FALSE;
575    const Torrent *          t = NULL;
576    const struct peer_atom * atom = NULL;
577
578    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
579    if( t )
580        atom = getExistingAtom( t, addr );
581    if( atom )
582        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
583
584    return isSeed;
585}
586
587/****
588*****
589*****  REFILL
590*****
591****/
592
593static void
594assertValidPiece( Torrent * t, tr_piece_index_t piece )
595{
596    assert( t );
597    assert( t->tor );
598    assert( piece < t->tor->info.pieceCount );
599}
600
601static int
602getPieceRequests( Torrent * t, tr_piece_index_t piece )
603{
604    assertValidPiece( t, piece );
605
606    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
607}
608
609static void
610incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
611{
612    assertValidPiece( t, piece );
613
614    if( t->pendingRequestCount == NULL )
615        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
616    t->pendingRequestCount[piece]++;
617}
618
619static void
620decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
621{
622    assertValidPiece( t, piece );
623
624    if( t->pendingRequestCount )
625        t->pendingRequestCount[piece]--;
626}
627
628struct tr_refill_piece
629{
630    tr_priority_t    priority;
631    uint32_t         piece;
632    uint32_t         peerCount;
633    int              missingBlockCount;
634    int              random;
635    int              pendingRequestCount;
636};
637
638static int
639compareRefillPiece( const void * aIn,
640                    const void * bIn )
641{
642    const struct tr_refill_piece * a = aIn;
643    const struct tr_refill_piece * b = bIn;
644
645    /* if one piece has a higher priority, it goes first */
646    if( a->priority != b->priority )
647        return a->priority > b->priority ? -1 : 1;
648
649    /* have a per-priority endgame */
650    if( a->pendingRequestCount != b->pendingRequestCount )
651        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
652
653    /* fewer missing pieces goes first */
654    if( a->missingBlockCount != b->missingBlockCount )
655        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
656
657    /* otherwise if one has fewer peers, it goes first */
658    if( a->peerCount != b->peerCount )
659        return a->peerCount < b->peerCount ? -1 : 1;
660
661    /* otherwise go with our random seed */
662    if( a->random != b->random )
663        return a->random < b->random ? -1 : 1;
664
665    return 0;
666}
667
668static tr_piece_index_t *
669getPreferredPieces( Torrent           * t,
670                    tr_piece_index_t  * pieceCount )
671{
672    const tr_torrent  * tor = t->tor;
673    const tr_info     * inf = &tor->info;
674    tr_piece_index_t    i;
675    tr_piece_index_t    poolSize = 0;
676    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
677    int                 peerCount;
678    tr_peer**           peers;
679
680    assert( torrentIsLocked( t ) );
681
682    peers = getConnectedPeers( t, &peerCount );
683
684    /* make a list of the pieces that we want but don't have */
685    for( i = 0; i < inf->pieceCount; ++i )
686        if( !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( tor->completion, i ) )
687            pool[poolSize++] = i;
688
689    /* sort the pool from most interesting to least... */
690    if( poolSize > 1 )
691    {
692        tr_piece_index_t j;
693        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
694
695        for( j = 0; j < poolSize; ++j )
696        {
697            int k;
698            const tr_piece_index_t piece = pool[j];
699            struct tr_refill_piece * setme = p + j;
700
701            setme->piece = piece;
702            setme->priority = inf->pieces[piece].priority;
703            setme->peerCount = 0;
704            setme->random = tr_cryptoWeakRandInt( INT_MAX );
705            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
706            setme->pendingRequestCount = getPieceRequests( t, piece );
707
708            for( k = 0; k < peerCount; ++k )
709            {
710                const tr_peer * peer = peers[k];
711                if( peer->peerIsInterested && !peer->clientIsChoked
712                  && tr_bitfieldHas( peer->have, piece ) )
713                    ++setme->peerCount;
714            }
715        }
716
717        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
718               compareRefillPiece );
719
720        for( j = 0; j < poolSize; ++j )
721            pool[j] = p[j].piece;
722
723        tr_free( p );
724    }
725
726    tr_free( peers );
727
728    *pieceCount = poolSize;
729    return pool;
730}
731
732struct tr_blockIterator
733{
734    Torrent * t;
735    tr_block_index_t blockIndex, blockCount, *blocks;
736    tr_piece_index_t pieceIndex, pieceCount, *pieces;
737};
738
739static struct tr_blockIterator*
740blockIteratorNew( Torrent * t )
741{
742    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
743    i->t = t;
744    i->pieces = getPreferredPieces( t, &i->pieceCount );
745    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
746    return i;
747}
748
749static int
750blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
751{
752    int found;
753    Torrent * t = i->t;
754    tr_torrent * tor = t->tor;
755
756    while( ( i->blockIndex == i->blockCount )
757        && ( i->pieceIndex < i->pieceCount ) )
758    {
759        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
760        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
761        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
762        tr_block_index_t block;
763
764        assert( index < tor->info.pieceCount );
765
766        i->blockCount = 0;
767        i->blockIndex = 0;
768        for( block=b; block!=e; ++block )
769            if( !tr_cpBlockIsComplete( tor->completion, block ) )
770                i->blocks[i->blockCount++] = block;
771    }
772
773    if(( found = ( i->blockIndex < i->blockCount )))
774        *setme = i->blocks[i->blockIndex++];
775
776    return found;
777}
778
779static void
780blockIteratorFree( struct tr_blockIterator * i )
781{
782    tr_free( i->pieces );
783    tr_free( i->blocks );
784    tr_free( i );
785}
786
787static tr_peer**
788getPeersUploadingToClient( Torrent * t,
789                           int *     setmeCount )
790{
791    int j;
792    int peerCount = 0;
793    int retCount = 0;
794    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
795    tr_peer ** ret = tr_new( tr_peer *, peerCount );
796
797    j = 0; /* this is a temporary test to make sure we walk through all the peers */
798    if( peerCount )
799    {
800        /* Get a list of peers we're downloading from.
801           Pick a different starting point each time so all peers
802           get a chance at the first blocks in the queue */
803        const int fencepost = tr_cryptoWeakRandInt( peerCount );
804        int i = fencepost;
805        do {
806            if( clientIsDownloadingFrom( peers[i] ) )
807                ret[retCount++] = peers[i];
808            i = ( i + 1 ) % peerCount;
809            ++j;
810        } while( i != fencepost );
811    }
812    assert( j == peerCount );
813    *setmeCount = retCount;
814    return ret;
815}
816
817static int
818refillPulse( void * vtorrent )
819{
820    tr_block_index_t block;
821    int peerCount;
822    int webseedCount;
823    tr_peer ** peers;
824    tr_webseed ** webseeds;
825    struct tr_blockIterator * blockIterator;
826    Torrent * t = vtorrent;
827    tr_torrent * tor = t->tor;
828
829    if( !t->isRunning )
830        return TRUE;
831    if( tr_torrentIsSeed( t->tor ) )
832        return TRUE;
833
834    torrentLock( t );
835    tordbg( t, "Refilling Request Buffers..." );
836
837    blockIterator = blockIteratorNew( t );
838    peers = getPeersUploadingToClient( t, &peerCount );
839    webseedCount = tr_ptrArraySize( t->webseeds );
840    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
841                          webseedCount * sizeof( tr_webseed* ) );
842
843    while( ( webseedCount || peerCount )
844        && blockIteratorNext( blockIterator, &block ) )
845    {
846        int j;
847        int handled = FALSE;
848
849        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
850        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
851        const uint32_t length = tr_torBlockCountBytes( tor, block );
852
853        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
854        assert( _tr_block( tor, index, begin ) == block );
855        assert( begin < tr_torPieceCountBytes( tor, index ) );
856        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
857
858        /* find a peer who can ask for this block */
859        for( j=0; !handled && j<peerCount; )
860        {
861            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
862            switch( val )
863            {
864                case TR_ADDREQ_FULL:
865                case TR_ADDREQ_CLIENT_CHOKED:
866                    peers[j] = peers[--peerCount];
867                    break;
868
869                case TR_ADDREQ_MISSING:
870                case TR_ADDREQ_DUPLICATE:
871                    ++j;
872                    break;
873
874                case TR_ADDREQ_OK:
875                    incrementPieceRequests( t, index );
876                    handled = TRUE;
877                    break;
878
879                default:
880                    assert( 0 && "unhandled value" );
881                    break;
882            }
883        }
884
885        /* maybe one of the webseeds can do it */
886        for( j=0; !handled && j<webseedCount; )
887        {
888            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, begin, length );
889            switch( val )
890            {
891                case TR_ADDREQ_FULL:
892                    webseeds[j] = webseeds[--webseedCount];
893                    break;
894
895                case TR_ADDREQ_OK:
896                    incrementPieceRequests( t, index );
897                    handled = TRUE;
898                    break;
899
900                default:
901                    assert( 0 && "unhandled value" );
902                    break;
903            }
904        }
905    }
906
907    /* cleanup */
908    blockIteratorFree( blockIterator );
909    tr_free( webseeds );
910    tr_free( peers );
911
912    t->refillTimer = NULL;
913    torrentUnlock( t );
914    return FALSE;
915}
916
917static void
918broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
919{
920    int i, size;
921    tr_peer ** peers;
922
923    assert( torrentIsLocked( t ) );
924
925    peers = getConnectedPeers( t, &size );
926    for( i=0; i<size; ++i )
927        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
928    tr_free( peers );
929}
930
931static void
932addStrike( Torrent * t,
933           tr_peer * peer )
934{
935    tordbg( t, "increasing peer %s strike count to %d",
936            tr_peerIoAddrStr( &peer->in_addr,
937                              peer->port ), peer->strikes + 1 );
938
939    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
940    {
941        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
942        atom->myflags |= MYFLAG_BANNED;
943        peer->doPurge = 1;
944        tordbg( t, "banning peer %s",
945               tr_peerIoAddrStr( &atom->addr, atom->port ) );
946    }
947}
948
949static void
950gotBadPiece( Torrent *        t,
951             tr_piece_index_t pieceIndex )
952{
953    tr_torrent *   tor = t->tor;
954    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
955
956    tor->corruptCur += byteCount;
957    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
958}
959
960static void
961refillSoon( Torrent * t )
962{
963    if( t->refillTimer == NULL )
964        t->refillTimer = tr_timerNew( t->manager->session,
965                                      refillPulse, t,
966                                      REFILL_PERIOD_MSEC );
967}
968
969static void
970peerCallbackFunc( void * vpeer,
971                  void * vevent,
972                  void * vt )
973{
974    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
975    Torrent *             t = (Torrent *) vt;
976    const tr_peer_event * e = vevent;
977
978    torrentLock( t );
979
980    switch( e->eventType )
981    {
982        case TR_PEER_NEED_REQ:
983            refillSoon( t );
984            break;
985
986        case TR_PEER_CANCEL:
987            decrementPieceRequests( t, e->pieceIndex );
988            break;
989
990        case TR_PEER_PEER_GOT_DATA:
991        {
992            const time_t now = time( NULL );
993            tr_torrent * tor = t->tor;
994            tor->activityDate = now;
995            tor->uploadedCur += e->length;
996            tr_statsAddUploaded( tor->session, e->length );
997            if( peer )
998            {
999                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1000                a->piece_data_time = now;
1001            }
1002            break;
1003        }
1004
1005        case TR_PEER_CLIENT_GOT_DATA:
1006        {
1007            const time_t now = time( NULL );
1008            tr_torrent * tor = t->tor;
1009            tor->activityDate = now;
1010            /* only add this to downloadedCur if we got it from a peer --
1011             * webseeds shouldn't count against our ratio.  As one tracker
1012             * admin put it, "Those pieces are downloaded directly from the
1013             * content distributor, not the peers, it is the tracker's job
1014             * to manage the swarms, not the web server and does not fit
1015             * into the jurisdiction of the tracker." */
1016            if( peer )
1017                tor->downloadedCur += e->length;
1018            tr_statsAddDownloaded( tor->session, e->length );
1019            if( peer ) {
1020                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1021                a->piece_data_time = now;
1022            }
1023            break;
1024        }
1025
1026        case TR_PEER_PEER_PROGRESS:
1027        {
1028            if( peer )
1029            {
1030                struct peer_atom * atom = getExistingAtom( t,
1031                                                           &peer->in_addr );
1032                const int          peerIsSeed = e->progress >= 1.0;
1033                if( peerIsSeed )
1034                {
1035                    tordbg( t, "marking peer %s as a seed",
1036                           tr_peerIoAddrStr( &atom->addr,
1037                                             atom->port ) );
1038                    atom->flags |= ADDED_F_SEED_FLAG;
1039                }
1040                else
1041                {
1042                    tordbg( t, "marking peer %s as a non-seed",
1043                           tr_peerIoAddrStr( &atom->addr,
1044                                             atom->port ) );
1045                    atom->flags &= ~ADDED_F_SEED_FLAG;
1046                }
1047            }
1048            break;
1049        }
1050
1051        case TR_PEER_CLIENT_GOT_BLOCK:
1052        {
1053            tr_torrent *     tor = t->tor;
1054
1055            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1056                                                e->offset );
1057
1058            tr_cpBlockAdd( tor->completion, block );
1059            decrementPieceRequests( t, e->pieceIndex );
1060
1061            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1062
1063            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1064            {
1065                const tr_piece_index_t p = e->pieceIndex;
1066                const int              ok = tr_ioTestPiece( tor, p );
1067
1068                if( !ok )
1069                {
1070                    tr_torerr( tor,
1071                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1072                              (unsigned long)p );
1073                }
1074
1075                tr_torrentSetHasPiece( tor, p, ok );
1076                tr_torrentSetPieceChecked( tor, p, TRUE );
1077                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1078
1079                if( !ok )
1080                    gotBadPiece( t, p );
1081                else
1082                {
1083                    int        i, peerCount;
1084                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1085                    for( i = 0; i < peerCount; ++i )
1086                        tr_peerMsgsHave( peers[i]->msgs, p );
1087                    tr_free( peers );
1088                }
1089
1090                tr_torrentRecheckCompleteness( tor );
1091            }
1092            break;
1093        }
1094
1095        case TR_PEER_ERROR:
1096            if( e->err == EINVAL )
1097            {
1098                addStrike( t, peer );
1099                peer->doPurge = 1;
1100            }
1101            else if( ( e->err == ERANGE )
1102                  || ( e->err == EMSGSIZE )
1103                  || ( e->err == ENOTCONN ) )
1104            {
1105                /* some protocol error from the peer */
1106                peer->doPurge = 1;
1107            }
1108            else /* a local error, such as an IO error */
1109            {
1110                t->tor->error = e->err;
1111                tr_strlcpy( t->tor->errorString,
1112                            tr_strerror( t->tor->error ),
1113                            sizeof( t->tor->errorString ) );
1114                tr_torrentStop( t->tor );
1115            }
1116            break;
1117
1118        default:
1119            assert( 0 );
1120    }
1121
1122    torrentUnlock( t );
1123}
1124
1125static void
1126ensureAtomExists( Torrent *              t,
1127                  const struct in_addr * addr,
1128                  uint16_t               port,
1129                  uint8_t                flags,
1130                  uint8_t                from )
1131{
1132    if( getExistingAtom( t, addr ) == NULL )
1133    {
1134        struct peer_atom * a;
1135        a = tr_new0( struct peer_atom, 1 );
1136        a->addr = *addr;
1137        a->port = port;
1138        a->flags = flags;
1139        a->from = from;
1140        tordbg( t, "got a new atom: %s",
1141               tr_peerIoAddrStr( &a->addr, a->port ) );
1142        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1143    }
1144}
1145
1146static int
1147getMaxPeerCount( const tr_torrent * tor )
1148{
1149    return tor->maxConnectedPeers;
1150}
1151
1152static int
1153getPeerCount( const Torrent * t )
1154{
1155    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1156               t->outgoingHandshakes );
1157}
1158
1159/* FIXME: this is kind of a mess. */
1160static int
1161myHandshakeDoneCB( tr_handshake *  handshake,
1162                   tr_peerIo *     io,
1163                   int             isConnected,
1164                   const uint8_t * peer_id,
1165                   void *          vmanager )
1166{
1167    int                    ok = isConnected;
1168    int                    success = FALSE;
1169    uint16_t               port;
1170    const struct in_addr * addr;
1171    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1172    Torrent *              t;
1173    tr_handshake *         ours;
1174
1175    assert( io );
1176    assert( isConnected == 0 || isConnected == 1 );
1177
1178    t = tr_peerIoHasTorrentHash( io )
1179        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1180        : NULL;
1181
1182    if( tr_peerIoIsIncoming ( io ) )
1183        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1184                                        handshake, handshakeCompare );
1185    else if( t )
1186        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1187                                        handshake, handshakeCompare );
1188    else
1189        ours = handshake;
1190
1191    assert( ours );
1192    assert( ours == handshake );
1193
1194    if( t )
1195        torrentLock( t );
1196
1197    addr = tr_peerIoGetAddress( io, &port );
1198
1199    if( !ok || !t || !t->isRunning )
1200    {
1201        if( t )
1202        {
1203            struct peer_atom * atom = getExistingAtom( t, addr );
1204            if( atom )
1205                ++atom->numFails;
1206        }
1207
1208        tr_peerIoFree( io );
1209    }
1210    else /* looking good */
1211    {
1212        struct peer_atom * atom;
1213        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1214        atom = getExistingAtom( t, addr );
1215        atom->time = time( NULL );
1216
1217        if( atom->myflags & MYFLAG_BANNED )
1218        {
1219            tordbg( t, "banned peer %s tried to reconnect",
1220                   tr_peerIoAddrStr( &atom->addr,
1221                                     atom->port ) );
1222            tr_peerIoFree( io );
1223        }
1224        else if( tr_peerIoIsIncoming( io )
1225               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1226
1227        {
1228            tr_peerIoFree( io );
1229        }
1230        else
1231        {
1232            tr_peer * peer = getExistingPeer( t, addr );
1233
1234            if( peer ) /* we already have this peer */
1235            {
1236                tr_peerIoFree( io );
1237            }
1238            else
1239            {
1240                peer = getPeer( t, addr );
1241                tr_free( peer->client );
1242
1243                if( !peer_id )
1244                    peer->client = NULL;
1245                else
1246                {
1247                    char client[128];
1248                    tr_clientForId( client, sizeof( client ), peer_id );
1249                    peer->client = tr_strdup( client );
1250                }
1251                peer->port = port;
1252                peer->io = io;
1253                peer->msgs =
1254                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1255                                    &peer->msgsTag );
1256
1257                success = TRUE;
1258            }
1259        }
1260    }
1261
1262    if( t )
1263        torrentUnlock( t );
1264
1265    return success;
1266}
1267
1268void
1269tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1270                       struct in_addr * addr,
1271                       uint16_t         port,
1272                       int              socket )
1273{
1274    managerLock( manager );
1275
1276    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1277    {
1278        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1279               inet_ntoa( *addr ) );
1280        tr_netClose( socket );
1281    }
1282    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1283    {
1284        tr_netClose( socket );
1285    }
1286    else /* we don't have a connetion to them yet... */
1287    {
1288        tr_peerIo *    io;
1289        tr_handshake * handshake;
1290
1291        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1292
1293        handshake = tr_handshakeNew( io,
1294                                     manager->session->encryptionMode,
1295                                     myHandshakeDoneCB,
1296                                     manager );
1297
1298        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1299                                 handshakeCompare );
1300    }
1301
1302    managerUnlock( manager );
1303}
1304
1305void
1306tr_peerMgrAddPex( tr_peerMgr *    manager,
1307                  const uint8_t * torrentHash,
1308                  uint8_t         from,
1309                  const tr_pex *  pex )
1310{
1311    Torrent * t;
1312
1313    managerLock( manager );
1314
1315    t = getExistingTorrent( manager, torrentHash );
1316    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1317        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1318
1319    managerUnlock( manager );
1320}
1321
1322tr_pex *
1323tr_peerMgrCompactToPex( const void *    compact,
1324                        size_t          compactLen,
1325                        const uint8_t * added_f,
1326                        size_t          added_f_len,
1327                        size_t *        pexCount )
1328{
1329    size_t          i;
1330    size_t          n = compactLen / 6;
1331    const uint8_t * walk = compact;
1332    tr_pex *        pex = tr_new0( tr_pex, n );
1333
1334    for( i = 0; i < n; ++i )
1335    {
1336        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1337        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1338        if( added_f && ( n == added_f_len ) )
1339            pex[i].flags = added_f[i];
1340    }
1341
1342    *pexCount = n;
1343    return pex;
1344}
1345
1346/**
1347***
1348**/
1349
1350void
1351tr_peerMgrSetBlame( tr_peerMgr *     manager,
1352                    const uint8_t *  torrentHash,
1353                    tr_piece_index_t pieceIndex,
1354                    int              success )
1355{
1356    if( !success )
1357    {
1358        int        peerCount, i;
1359        Torrent *  t = getExistingTorrent( manager, torrentHash );
1360        tr_peer ** peers;
1361
1362        assert( torrentIsLocked( t ) );
1363
1364        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1365        for( i = 0; i < peerCount; ++i )
1366        {
1367            tr_peer * peer = peers[i];
1368            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1369            {
1370                tordbg(
1371                    t,
1372                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1373                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1374                    pieceIndex, (int)peer->strikes + 1 );
1375                addStrike( t, peer );
1376            }
1377        }
1378    }
1379}
1380
1381int
1382tr_pexCompare( const void * va,
1383               const void * vb )
1384{
1385    const tr_pex * a = va;
1386    const tr_pex * b = vb;
1387    int            i =
1388        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1389
1390    if( i ) return i;
1391    if( a->port < b->port ) return -1;
1392    if( a->port > b->port ) return 1;
1393    return 0;
1394}
1395
1396int tr_pexCompare( const void * a,
1397                   const void * b );
1398
1399static int
1400peerPrefersCrypto( const tr_peer * peer )
1401{
1402    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1403        return TRUE;
1404
1405    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1406        return FALSE;
1407
1408    return tr_peerIoIsEncrypted( peer->io );
1409}
1410
1411int
1412tr_peerMgrGetPeers( tr_peerMgr *    manager,
1413                    const uint8_t * torrentHash,
1414                    tr_pex **       setme_pex )
1415{
1416    int peerCount = 0;
1417    const Torrent *  t;
1418
1419    managerLock( manager );
1420
1421    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1422    if( !t )
1423    {
1424        *setme_pex = NULL;
1425    }
1426    else
1427    {
1428        int i;
1429        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1430        tr_pex * pex = tr_new( tr_pex, peerCount );
1431        tr_pex * walk = pex;
1432
1433        for( i = 0; i < peerCount; ++i, ++walk )
1434        {
1435            const tr_peer * peer = peers[i];
1436            walk->in_addr = peer->in_addr;
1437            walk->port = peer->port;
1438            walk->flags = 0;
1439            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1440            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1441        }
1442
1443        assert( ( walk - pex ) == peerCount );
1444        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1445        *setme_pex = pex;
1446    }
1447
1448    managerUnlock( manager );
1449    return peerCount;
1450}
1451
1452static int reconnectPulse( void * vtorrent );
1453
1454static int rechokePulse( void * vtorrent );
1455
1456void
1457tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1458                        const uint8_t * torrentHash )
1459{
1460    Torrent * t;
1461
1462    managerLock( manager );
1463
1464    t = getExistingTorrent( manager, torrentHash );
1465
1466    assert( t );
1467    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1468    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1469
1470    if( !t->isRunning )
1471    {
1472        t->isRunning = 1;
1473
1474        t->reconnectTimer = tr_timerNew( t->manager->session,
1475                                         reconnectPulse, t,
1476                                         RECONNECT_PERIOD_MSEC );
1477
1478        t->rechokeTimer = tr_timerNew( t->manager->session,
1479                                       rechokePulse, t,
1480                                       RECHOKE_PERIOD_MSEC );
1481
1482        reconnectPulse( t );
1483
1484        rechokePulse( t );
1485
1486        if( !tr_ptrArrayEmpty( t->webseeds ) )
1487            refillSoon( t );
1488    }
1489
1490    managerUnlock( manager );
1491}
1492
1493static void
1494stopTorrent( Torrent * t )
1495{
1496    assert( torrentIsLocked( t ) );
1497
1498    t->isRunning = 0;
1499    tr_timerFree( &t->rechokeTimer );
1500    tr_timerFree( &t->reconnectTimer );
1501
1502    /* disconnect the peers. */
1503    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1504    tr_ptrArrayClear( t->peers );
1505
1506    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1507     * which removes the handshake from t->outgoingHandshakes... */
1508    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1509        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1510}
1511
1512void
1513tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1514                       const uint8_t * torrentHash )
1515{
1516    managerLock( manager );
1517
1518    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1519
1520    managerUnlock( manager );
1521}
1522
1523void
1524tr_peerMgrAddTorrent( tr_peerMgr * manager,
1525                      tr_torrent * tor )
1526{
1527    Torrent * t;
1528
1529    managerLock( manager );
1530
1531    assert( tor );
1532    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1533
1534    t = torrentConstructor( manager, tor );
1535    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1536
1537    managerUnlock( manager );
1538}
1539
1540void
1541tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1542                         const uint8_t * torrentHash )
1543{
1544    Torrent * t;
1545
1546    managerLock( manager );
1547
1548    t = getExistingTorrent( manager, torrentHash );
1549    assert( t );
1550    stopTorrent( t );
1551    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1552    torrentDestructor( t );
1553
1554    managerUnlock( manager );
1555}
1556
1557void
1558tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1559                               const uint8_t *    torrentHash,
1560                               int8_t *           tab,
1561                               unsigned int       tabCount )
1562{
1563    tr_piece_index_t   i;
1564    const Torrent *    t;
1565    const tr_torrent * tor;
1566    float              interval;
1567    int                isComplete;
1568    int                peerCount;
1569    const tr_peer **   peers;
1570
1571    managerLock( manager );
1572
1573    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1574    tor = t->tor;
1575    interval = tor->info.pieceCount / (float)tabCount;
1576    isComplete = tor
1577                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1578    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1579
1580    memset( tab, 0, tabCount );
1581
1582    for( i = 0; tor && i < tabCount; ++i )
1583    {
1584        const int piece = i * interval;
1585
1586        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1587            tab[i] = -1;
1588        else if( peerCount )
1589        {
1590            int j;
1591            for( j = 0; j < peerCount; ++j )
1592                if( tr_bitfieldHas( peers[j]->have, i ) )
1593                    ++tab[i];
1594        }
1595    }
1596
1597    managerUnlock( manager );
1598}
1599
1600/* Returns the pieces that are available from peers */
1601tr_bitfield*
1602tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1603                        const uint8_t *    torrentHash )
1604{
1605    int           i, size;
1606    Torrent *     t;
1607    tr_peer **    peers;
1608    tr_bitfield * pieces;
1609
1610    managerLock( manager );
1611
1612    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1613    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1614    peers = getConnectedPeers( t, &size );
1615    for( i = 0; i < size; ++i )
1616        tr_bitfieldOr( pieces, peers[i]->have );
1617
1618    managerUnlock( manager );
1619    tr_free( peers );
1620    return pieces;
1621}
1622
1623int
1624tr_peerMgrHasConnections( const tr_peerMgr * manager,
1625                          const uint8_t *    torrentHash )
1626{
1627    int             ret;
1628    const Torrent * t;
1629
1630    managerLock( manager );
1631
1632    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1633    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1634               || !tr_ptrArrayEmpty( t->webseeds ) );
1635
1636    managerUnlock( manager );
1637    return ret;
1638}
1639
1640void
1641tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1642                        const uint8_t *    torrentHash,
1643                        int *              setmePeersKnown,
1644                        int *              setmePeersConnected,
1645                        int *              setmeSeedsConnected,
1646                        int *              setmeWebseedsSendingToUs,
1647                        int *              setmePeersSendingToUs,
1648                        int *              setmePeersGettingFromUs,
1649                        int *              setmePeersFrom )
1650{
1651    int                 i, size;
1652    const Torrent *     t;
1653    const tr_peer **    peers;
1654    const tr_webseed ** webseeds;
1655
1656    managerLock( manager );
1657
1658    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1659    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1660
1661    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1662    *setmePeersConnected       = 0;
1663    *setmeSeedsConnected       = 0;
1664    *setmePeersGettingFromUs   = 0;
1665    *setmePeersSendingToUs     = 0;
1666    *setmeWebseedsSendingToUs  = 0;
1667
1668    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1669        setmePeersFrom[i] = 0;
1670
1671    for( i = 0; i < size; ++i )
1672    {
1673        const tr_peer *          peer = peers[i];
1674        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1675
1676        if( peer->io == NULL ) /* not connected */
1677            continue;
1678
1679        ++ * setmePeersConnected;
1680
1681        ++setmePeersFrom[atom->from];
1682
1683        if( clientIsDownloadingFrom( peer ) )
1684            ++ * setmePeersSendingToUs;
1685
1686        if( clientIsUploadingTo( peer ) )
1687            ++ * setmePeersGettingFromUs;
1688
1689        if( atom->flags & ADDED_F_SEED_FLAG )
1690            ++ * setmeSeedsConnected;
1691    }
1692
1693    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1694    for( i = 0; i < size; ++i )
1695    {
1696        if( tr_webseedIsActive( webseeds[i] ) )
1697            ++ * setmeWebseedsSendingToUs;
1698    }
1699
1700    managerUnlock( manager );
1701}
1702
1703double
1704tr_peerMgrGetRate( const tr_peerMgr * manager,
1705                   tr_direction       direction )
1706{
1707    int    i;
1708    double bytes = 0;
1709
1710    assert( manager != NULL );
1711    assert( direction == TR_UP || direction == TR_DOWN );
1712
1713    for( i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
1714        bytes += manager->rateHistory[direction][i];
1715
1716    return ( BANDWIDTH_PULSES_PER_SECOND * bytes )
1717           / ( BANDWIDTH_PULSE_HISTORY * 1024 );
1718}
1719
1720float*
1721tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1722                     const uint8_t *    torrentHash )
1723{
1724    const Torrent *     t;
1725    const tr_webseed ** webseeds;
1726    int                 i;
1727    int                 webseedCount;
1728    float *             ret;
1729
1730    assert( manager );
1731    managerLock( manager );
1732
1733    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1734    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1735                                                     &webseedCount );
1736    assert( webseedCount == t->tor->info.webseedCount );
1737    ret = tr_new0( float, webseedCount );
1738
1739    for( i = 0; i < webseedCount; ++i )
1740        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1741            ret[i] = -1.0;
1742
1743    managerUnlock( manager );
1744    return ret;
1745}
1746
1747struct tr_peer_stat *
1748tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1749                     const   uint8_t     * torrentHash,
1750                     int                 * setmeCount UNUSED )
1751{
1752    int             i, size;
1753    const Torrent * t;
1754    tr_peer **      peers;
1755    tr_peer_stat *  ret;
1756
1757    assert( manager );
1758    managerLock( manager );
1759
1760    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1761    peers = getConnectedPeers( (Torrent*)t, &size );
1762    ret = tr_new0( tr_peer_stat, size );
1763
1764    for( i = 0; i < size; ++i )
1765    {
1766        char *                   pch;
1767        const tr_peer *          peer = peers[i];
1768        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1769        tr_peer_stat *           stat = ret + i;
1770
1771        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1772        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1773                   sizeof( stat->client ) );
1774        stat->port               = peer->port;
1775        stat->from               = atom->from;
1776        stat->progress           = peer->progress;
1777        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1778        stat->rateToPeer         = tr_peerIoGetRateToPeer( peer->io );
1779        stat->rateToClient       = tr_peerIoGetRateToClient( peer->io );
1780        stat->peerIsChoked       = peer->peerIsChoked;
1781        stat->peerIsInterested   = peer->peerIsInterested;
1782        stat->clientIsChoked     = peer->clientIsChoked;
1783        stat->clientIsInterested = peer->clientIsInterested;
1784        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1785        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1786        stat->isUploadingTo      = clientIsUploadingTo( peer );
1787
1788        pch = stat->flagStr;
1789        if( t->optimistic == peer ) *pch++ = 'O';
1790        if( stat->isDownloadingFrom ) *pch++ = 'D';
1791        else if( stat->clientIsInterested ) *pch++ = 'd';
1792        if( stat->isUploadingTo ) *pch++ = 'U';
1793        else if( stat->peerIsInterested ) *pch++ = 'u';
1794        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1795                'K';
1796        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1797        if( stat->isEncrypted ) *pch++ = 'E';
1798        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1799        if( stat->isIncoming ) *pch++ = 'I';
1800        *pch = '\0';
1801    }
1802
1803    *setmeCount = size;
1804    tr_free( peers );
1805
1806    managerUnlock( manager );
1807    return ret;
1808}
1809
1810/**
1811***
1812**/
1813
1814struct ChokeData
1815{
1816    unsigned int    doUnchoke    : 1;
1817    unsigned int    isInterested : 1;
1818    double          rateToClient;
1819    double          rateToPeer;
1820    tr_peer *       peer;
1821};
1822
1823static int
1824tr_compareDouble( double a,
1825                  double b )
1826{
1827    if( a < b ) return -1;
1828    if( a > b ) return 1;
1829    return 0;
1830}
1831
1832static int
1833compareChoke( const void * va,
1834              const void * vb )
1835{
1836    const struct ChokeData * a = va;
1837    const struct ChokeData * b = vb;
1838    int                      diff = 0;
1839
1840    if( diff == 0 ) /* prefer higher dl speeds */
1841        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1842    if( diff == 0 ) /* prefer higher ul speeds */
1843        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1844    if( diff == 0 ) /* prefer unchoked */
1845        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1846
1847    return diff;
1848}
1849
1850static int
1851isNew( const tr_peer * peer )
1852{
1853    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1854}
1855
1856static int
1857isSame( const tr_peer * peer )
1858{
1859    return peer && peer->client && strstr( peer->client, "Transmission" );
1860}
1861
1862/**
1863***
1864**/
1865
1866static void
1867rechoke( Torrent * t )
1868{
1869    int                i, peerCount, size, unchokedInterested;
1870    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1871    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1872
1873    assert( torrentIsLocked( t ) );
1874
1875    /* sort the peers by preference and rate */
1876    for( i = 0, size = 0; i < peerCount; ++i )
1877    {
1878        tr_peer * peer = peers[i];
1879        if( peer->progress >= 1.0 ) /* choke all seeds */
1880            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1881        else
1882        {
1883            struct ChokeData * node = &choke[size++];
1884            node->peer = peer;
1885            node->isInterested = peer->peerIsInterested;
1886            node->rateToClient = tr_peerIoGetRateToClient( peer->io );
1887            node->rateToPeer = tr_peerIoGetRateToPeer( peer->io );
1888        }
1889    }
1890
1891    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1892
1893    /**
1894     * Reciprocation and number of uploads capping is managed by unchoking
1895     * the N peers which have the best upload rate and are interested.
1896     * This maximizes the client's download rate. These N peers are
1897     * referred to as downloaders, because they are interested in downloading
1898     * from the client.
1899     *
1900     * Peers which have a better upload rate (as compared to the downloaders)
1901     * but aren't interested get unchoked. If they become interested, the
1902     * downloader with the worst upload rate gets choked. If a client has
1903     * a complete file, it uses its upload rate rather than its download
1904     * rate to decide which peers to unchoke.
1905     */
1906    unchokedInterested = 0;
1907    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1908    {
1909        choke[i].doUnchoke = 1;
1910        if( choke[i].isInterested )
1911            ++unchokedInterested;
1912    }
1913
1914    /* optimistic unchoke */
1915    if( i < size )
1916    {
1917        int                n;
1918        struct ChokeData * c;
1919        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1920
1921        for( ; i < size; ++i )
1922        {
1923            if( choke[i].isInterested )
1924            {
1925                const tr_peer * peer = choke[i].peer;
1926                int             x = 1, y;
1927                if( isNew( peer ) ) x *= 3;
1928                if( isSame( peer ) ) x *= 3;
1929                for( y = 0; y < x; ++y )
1930                    tr_ptrArrayAppend( randPool, &choke[i] );
1931            }
1932        }
1933
1934        if( ( n = tr_ptrArraySize( randPool ) ) )
1935        {
1936            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1937            c->doUnchoke = 1;
1938            t->optimistic = c->peer;
1939        }
1940
1941        tr_ptrArrayFree( randPool, NULL );
1942    }
1943
1944    for( i = 0; i < size; ++i )
1945        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1946
1947    /* cleanup */
1948    tr_free( choke );
1949    tr_free( peers );
1950}
1951
1952static int
1953rechokePulse( void * vtorrent )
1954{
1955    Torrent * t = vtorrent;
1956
1957    torrentLock( t );
1958    rechoke( t );
1959    torrentUnlock( t );
1960    return TRUE;
1961}
1962
1963/***
1964****
1965****  Life and Death
1966****
1967***/
1968
1969static int
1970shouldPeerBeClosed( const Torrent * t,
1971                    const tr_peer * peer,
1972                    int             peerCount )
1973{
1974    const tr_torrent *       tor = t->tor;
1975    const time_t             now = time( NULL );
1976    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1977
1978    /* if it's marked for purging, close it */
1979    if( peer->doPurge )
1980    {
1981        tordbg( t, "purging peer %s because its doPurge flag is set",
1982               tr_peerIoAddrStr( &atom->addr,
1983                                 atom->port ) );
1984        return TRUE;
1985    }
1986
1987    /* if we're seeding and the peer has everything we have,
1988     * and enough time has passed for a pex exchange, then disconnect */
1989    if( tr_torrentIsSeed( tor ) )
1990    {
1991        int peerHasEverything;
1992        if( atom->flags & ADDED_F_SEED_FLAG )
1993            peerHasEverything = TRUE;
1994        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1995            peerHasEverything = FALSE;
1996        else
1997        {
1998            tr_bitfield * tmp =
1999                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2000            tr_bitfieldDifference( tmp, peer->have );
2001            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2002            tr_bitfieldFree( tmp );
2003        }
2004        if( peerHasEverything
2005          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2006        {
2007            tordbg( t, "purging peer %s because we're both seeds",
2008                   tr_peerIoAddrStr( &atom->addr,
2009                                     atom->port ) );
2010            return TRUE;
2011        }
2012    }
2013
2014    /* disconnect if it's been too long since piece data has been transferred.
2015     * this is on a sliding scale based on number of available peers... */
2016    {
2017        const int    relaxStrictnessIfFewerThanN =
2018            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2019        /* if we have >= relaxIfFewerThan, strictness is 100%.
2020         * if we have zero connections, strictness is 0% */
2021        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2022                                  ? 1.0
2023                                  : peerCount /
2024                                  (float)relaxStrictnessIfFewerThanN;
2025        const int    lo = MIN_UPLOAD_IDLE_SECS;
2026        const int    hi = MAX_UPLOAD_IDLE_SECS;
2027        const int    limit = lo + ( ( hi - lo ) * strictness );
2028        const time_t then = peer->pieceDataActivityDate;
2029        const int    idleTime = then ? ( now - then ) : 0;
2030        if( idleTime > limit )
2031        {
2032            tordbg(
2033                t,
2034                "purging peer %s because it's been %d secs since we shared anything",
2035                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2036            return TRUE;
2037        }
2038    }
2039
2040    return FALSE;
2041}
2042
2043static tr_peer **
2044getPeersToClose( Torrent * t,
2045                 int *     setmeSize )
2046{
2047    int               i, peerCount, outsize;
2048    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2049                                                           &peerCount );
2050    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2051
2052    assert( torrentIsLocked( t ) );
2053
2054    for( i = outsize = 0; i < peerCount; ++i )
2055        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2056            ret[outsize++] = peers[i];
2057
2058    *setmeSize = outsize;
2059    return ret;
2060}
2061
2062static int
2063compareCandidates( const void * va,
2064                   const void * vb )
2065{
2066    const struct peer_atom * a = *(const struct peer_atom**) va;
2067    const struct peer_atom * b = *(const struct peer_atom**) vb;
2068
2069    /* <Charles> Here we would probably want to try reconnecting to
2070     * peers that had most recently given us data. Lots of users have
2071     * trouble with resets due to their routers and/or ISPs. This way we
2072     * can quickly recover from an unwanted reset. So we sort
2073     * piece_data_time in descending order.
2074     */
2075
2076    if( a->piece_data_time != b->piece_data_time )
2077        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2078
2079    if( a->numFails != b->numFails )
2080        return a->numFails < b->numFails ? -1 : 1;
2081
2082    if( a->time != b->time )
2083        return a->time < b->time ? -1 : 1;
2084
2085    return 0;
2086}
2087
2088static int
2089getReconnectIntervalSecs( const struct peer_atom * atom )
2090{
2091    int          sec;
2092    const time_t now = time( NULL );
2093
2094    /* if we were recently connected to this peer and transferring piece
2095     * data, try to reconnect to them sooner rather that later -- we don't
2096     * want network troubles to get in the way of a good peer. */
2097    if( ( now - atom->piece_data_time ) <=
2098       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2099        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2100
2101    /* don't allow reconnects more often than our minimum */
2102    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2103        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2104
2105    /* otherwise, the interval depends on how many times we've tried
2106     * and failed to connect to the peer */
2107    else switch( atom->numFails )
2108        {
2109            case 0:
2110                sec = 0; break;
2111
2112            case 1:
2113                sec = 5; break;
2114
2115            case 2:
2116                sec = 2 * 60; break;
2117
2118            case 3:
2119                sec = 15 * 60; break;
2120
2121            case 4:
2122                sec = 30 * 60; break;
2123
2124            case 5:
2125                sec = 60 * 60; break;
2126
2127            default:
2128                sec = 120 * 60; break;
2129        }
2130
2131    return sec;
2132}
2133
2134static struct peer_atom **
2135getPeerCandidates(                               Torrent * t,
2136                                           int * setmeSize )
2137{
2138    int                 i, atomCount, retCount;
2139    struct peer_atom ** atoms;
2140    struct peer_atom ** ret;
2141    const time_t        now = time( NULL );
2142    const int           seed = tr_torrentIsSeed( t->tor );
2143
2144    assert( torrentIsLocked( t ) );
2145
2146    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2147    ret = tr_new( struct peer_atom*, atomCount );
2148    for( i = retCount = 0; i < atomCount; ++i )
2149    {
2150        int                interval;
2151        struct peer_atom * atom = atoms[i];
2152
2153        /* peer fed us too much bad data ... we only keep it around
2154         * now to weed it out in case someone sends it to us via pex */
2155        if( atom->myflags & MYFLAG_BANNED )
2156            continue;
2157
2158        /* peer was unconnectable before, so we're not going to keep trying.
2159         * this is needs a separate flag from `banned', since if they try
2160         * to connect to us later, we'll let them in */
2161        if( atom->myflags & MYFLAG_UNREACHABLE )
2162            continue;
2163
2164        /* we don't need two connections to the same peer... */
2165        if( peerIsInUse( t, &atom->addr ) )
2166            continue;
2167
2168        /* no need to connect if we're both seeds... */
2169        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2170            continue;
2171
2172        /* don't reconnect too often */
2173        interval = getReconnectIntervalSecs( atom );
2174        if( ( now - atom->time ) < interval )
2175        {
2176            tordbg(
2177                t,
2178                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2179                i, tr_peerIoAddrStr( &atom->addr,
2180                                     atom->port ), interval );
2181            continue;
2182        }
2183
2184        /* Don't connect to peers in our blocklist */
2185        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2186            continue;
2187
2188        ret[retCount++] = atom;
2189    }
2190
2191    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2192    *setmeSize = retCount;
2193    return ret;
2194}
2195
2196static int
2197reconnectPulse( void * vtorrent )
2198{
2199    Torrent *     t = vtorrent;
2200    static time_t prevTime = 0;
2201    static int    newConnectionsThisSecond = 0;
2202    time_t        now;
2203
2204    torrentLock( t );
2205
2206    now = time( NULL );
2207    if( prevTime != now )
2208    {
2209        prevTime = now;
2210        newConnectionsThisSecond = 0;
2211    }
2212
2213    if( !t->isRunning )
2214    {
2215        removeAllPeers( t );
2216    }
2217    else
2218    {
2219        int                 i, nCandidates, nBad;
2220        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2221        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2222
2223        if( nBad || nCandidates )
2224            tordbg(
2225                t, "reconnect pulse for [%s]: %d bad connections, "
2226                   "%d connection candidates, %d atoms, max per pulse is %d",
2227                t->tor->info.name, nBad, nCandidates,
2228                tr_ptrArraySize( t->pool ),
2229                (int)MAX_RECONNECTIONS_PER_PULSE );
2230
2231        /* disconnect some peers.
2232           if we transferred piece data, then they might be good peers,
2233           so reset their `numFails' weight to zero.  otherwise we connected
2234           to them fruitlessly, so mark it as another fail */
2235        for( i = 0; i < nBad; ++i )
2236        {
2237            tr_peer *          peer = connections[i];
2238            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2239            if( peer->pieceDataActivityDate )
2240                atom->numFails = 0;
2241            else
2242                ++atom->numFails;
2243            tordbg( t, "removing bad peer %s",
2244                   tr_peerIoGetAddrStr( peer->io ) );
2245            removePeer( t, peer );
2246        }
2247
2248        /* add some new ones */
2249        for( i = 0;    ( i < nCandidates )
2250           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2251           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2252           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2253             ++i )
2254        {
2255            tr_peerMgr *       mgr = t->manager;
2256            struct peer_atom * atom = candidates[i];
2257            tr_peerIo *        io;
2258
2259            tordbg( t, "Starting an OUTGOING connection with %s",
2260                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2261
2262            io =
2263                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2264                                      t->hash );
2265            if( io == NULL )
2266            {
2267                atom->myflags |= MYFLAG_UNREACHABLE;
2268            }
2269            else
2270            {
2271                tr_handshake * handshake = tr_handshakeNew(
2272                    io,
2273                    mgr->session->
2274                    encryptionMode,
2275                    myHandshakeDoneCB,
2276                    mgr );
2277
2278                assert( tr_peerIoGetTorrentHash( io ) );
2279
2280                ++newConnectionsThisSecond;
2281
2282                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2283                                         handshakeCompare );
2284            }
2285
2286            atom->time = time( NULL );
2287        }
2288
2289        /* cleanup */
2290        tr_free( connections );
2291        tr_free( candidates );
2292    }
2293
2294    torrentUnlock( t );
2295    return TRUE;
2296}
2297
2298/****
2299*****
2300*****  BANDWIDTH ALLOCATION
2301*****
2302****/
2303
2304static double
2305allocateHowMuch( double         desiredAvgKB,
2306                 const double * history )
2307{
2308    const double baseline = desiredAvgKB * 1024.0 /
2309                            BANDWIDTH_PULSES_PER_SECOND;
2310    const double min = baseline * 0.66;
2311    const double max = baseline * 1.33;
2312    int          i;
2313    double       usedBytes;
2314    double       n;
2315    double       clamped;
2316
2317    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2318        usedBytes += history[i];
2319
2320    n = ( desiredAvgKB * 1024.0 )
2321      * ( BANDWIDTH_PULSE_HISTORY + 1.0 )
2322      / BANDWIDTH_PULSES_PER_SECOND
2323      - usedBytes;
2324
2325    /* clamp the return value to lessen oscillation */
2326    clamped = n;
2327    clamped = MAX( clamped, min );
2328    clamped = MIN( clamped, max );
2329/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2330  (%.2f)\n", desiredAvgKB,
2331  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2332  clamped/1024.0, n/1024.0 );*/
2333    return clamped;
2334}
2335
2336/**
2337 * Distributes a fixed amount of bandwidth among a set of peers.
2338 *
2339 * @param peerArray peers whose client-to-peer bandwidth will be set
2340 * @param direction whether to allocate upload or download bandwidth
2341 * @param history recent bandwidth history for these peers
2342 * @param desiredAvgKB overall bandwidth goal for this set of peers
2343 */
2344static void
2345setPeerBandwidth( tr_ptrArray *      peerArray,
2346                  const tr_direction direction,
2347                  const double *     history,
2348                  double             desiredAvgKB )
2349{
2350    const int    peerCount = tr_ptrArraySize( peerArray );
2351    const double bytes = allocateHowMuch( desiredAvgKB, history );
2352    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2353    const double meritBytes = MAX( 0, bytes - welfareBytes );
2354    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2355    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2356    int          i;
2357    int          candidateCount;
2358    double       welfare;
2359    size_t       bytesUsed;
2360
2361    assert( meritBytes >= 0.0 );
2362    assert( welfareBytes >= 0.0 );
2363    assert( direction == TR_UP || direction == TR_DOWN );
2364
2365    for( i = candidateCount = 0; i < peerCount; ++i )
2366        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2367            candidates[candidateCount++] = peers[i];
2368        else
2369            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2370
2371    for( i = bytesUsed = 0; i < candidateCount; ++i )
2372        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2373                                                direction );
2374
2375    welfare = welfareBytes / candidateCount;
2376
2377    for( i = 0; i < candidateCount; ++i )
2378    {
2379        tr_peer *    peer = candidates[i];
2380        const double merit = bytesUsed
2381                             ? ( meritBytes *
2382                                tr_peerIoGetBandwidthUsed( peer->io,
2383                                                           direction ) ) /
2384                             bytesUsed
2385                             : ( meritBytes / candidateCount );
2386        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2387    }
2388
2389    /* cleanup */
2390    tr_free( candidates );
2391}
2392
2393static size_t
2394countHandshakeBandwidth( tr_ptrArray * handshakes,
2395                         tr_direction  direction )
2396{
2397    const int n = tr_ptrArraySize( handshakes );
2398    int       i;
2399    size_t    total;
2400
2401    for( i = total = 0; i < n; ++i )
2402    {
2403        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2404        total += tr_peerIoGetBandwidthUsed( io, direction );
2405    }
2406    return total;
2407}
2408
2409static size_t
2410countPeerBandwidth( tr_ptrArray * peers,
2411                    tr_direction  direction )
2412{
2413    const int n = tr_ptrArraySize( peers );
2414    int       i;
2415    size_t    total;
2416
2417    for( i = total = 0; i < n; ++i )
2418    {
2419        tr_peer * peer = tr_ptrArrayNth( peers, i );
2420        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2421    }
2422    return total;
2423}
2424
2425static void
2426givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2427                             tr_direction  direction )
2428{
2429    const int n = tr_ptrArraySize( peers );
2430    int       i;
2431
2432    for( i = 0; i < n; ++i )
2433    {
2434        tr_peer * peer = tr_ptrArrayNth( peers, i );
2435        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2436    }
2437}
2438
2439static void
2440pumpAllPeers( tr_peerMgr * mgr )
2441{
2442    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2443    int       i, j;
2444
2445    for( i = 0; i < torrentCount; ++i )
2446    {
2447        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2448        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2449        {
2450            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2451            tr_peerMsgsPulse( peer->msgs );
2452        }
2453    }
2454}
2455
2456/**
2457 * Allocate bandwidth for each peer connection.
2458 *
2459 * @param mgr the peer manager
2460 * @param direction whether to allocate upload or download bandwidth
2461 * @return the amount of directional bandwidth used since the last pulse.
2462 */
2463static double
2464allocateBandwidth( tr_peerMgr * mgr,
2465                   tr_direction direction )
2466{
2467    tr_session *  session = mgr->session;
2468    const int     pulseNumber = mgr->bandwidthPulseNumber;
2469    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2470    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2471    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2472    double        allBytesUsed = 0;
2473    size_t        poolBytesUsed = 0;
2474    int           i;
2475
2476    assert( mgr );
2477    assert( direction == TR_UP || direction == TR_DOWN );
2478
2479    /* before allocating bandwidth, pump the connected peers */
2480    pumpAllPeers( mgr );
2481
2482    for( i = 0; i < torrentCount; ++i )
2483    {
2484        Torrent *    t = torrents[i];
2485        const size_t used = countPeerBandwidth( t->peers, direction );
2486        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2487
2488        /* remember this torrent's bytes used */
2489        t->tor->rateHistory[direction][pulseNumber] = used;
2490
2491        /* add this torrent's bandwidth use to allBytesUsed */
2492        allBytesUsed += used;
2493
2494        /* process the torrent's peers based on its speed mode */
2495        switch( tr_torrentGetSpeedMode( t->tor, direction ) )
2496        {
2497            case TR_SPEEDLIMIT_UNLIMITED:
2498                givePeersUnlimitedBandwidth( t->peers, direction );
2499                break;
2500
2501            case TR_SPEEDLIMIT_SINGLE:
2502                setPeerBandwidth( t->peers, direction,
2503                                  t->tor->rateHistory[direction],
2504                                  tr_torrentGetSpeedLimit( t->tor,
2505                                                           direction ) );
2506                break;
2507
2508            case TR_SPEEDLIMIT_GLOBAL:
2509            {
2510                int       i;
2511                const int n = tr_ptrArraySize( t->peers );
2512                for( i = 0; i < n; ++i )
2513                    tr_ptrArrayAppend( globalPool,
2514                                      tr_ptrArrayNth( t->peers, i ) );
2515                poolBytesUsed += used;
2516                break;
2517            }
2518        }
2519    }
2520
2521    /* add incoming handshakes to the global pool */
2522    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2523    allBytesUsed += i;
2524    poolBytesUsed += i;
2525
2526    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2527
2528    /* handle the global pool's connections */
2529    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2530        givePeersUnlimitedBandwidth( globalPool, direction );
2531    else
2532        setPeerBandwidth( globalPool, direction,
2533                         mgr->globalPoolHistory[direction],
2534                         tr_sessionGetSpeedLimit( session, direction ) );
2535
2536    /* now that we've allocated bandwidth, pump all the connected peers */
2537    pumpAllPeers( mgr );
2538
2539    /* cleanup */
2540    tr_ptrArrayFree( globalPool, NULL );
2541    return allBytesUsed;
2542}
2543
2544static int
2545bandwidthPulse( void * vmgr )
2546{
2547    tr_peerMgr * mgr = vmgr;
2548    int          i;
2549
2550    managerLock( mgr );
2551
2552    /* keep track of how far we are into the cycle */
2553    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2554        mgr->bandwidthPulseNumber = 0;
2555
2556    /* allocate the upload and download bandwidth */
2557    for( i = 0; i < 2; ++i )
2558        mgr->rateHistory[i][mgr->bandwidthPulseNumber] =
2559            allocateBandwidth( mgr, i );
2560
2561    managerUnlock( mgr );
2562    return TRUE;
2563}
2564
Note: See TracBrowser for help on using the repository browser.