source: trunk/libtransmission/peer-mgr.c @ 6842

Last change on this file since 6842 was 6842, checked in by charles, 14 years ago

(libT) finish killing tr_errno.

  • Property svn:keywords set to Date Rev Author Id
File size: 69.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6842 2008-10-03 04:49:06Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62    * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.
76     * if they try to connect to us it's okay */
77    MYFLAG_UNREACHABLE = 2,
78
79    /* the minimum we'll wait before attempting to reconnect to a peer */
80    MINIMUM_RECONNECT_INTERVAL_SECS = 5
81};
82
83
84/**
85***
86**/
87
88/* We keep one of these for every peer we know about, whether
89 * it's connected or not, so the struct must be small.
90 * When our current connections underperform, we dip back
91 * into this list for new ones. */
92struct peer_atom
93{
94    uint8_t           from;
95    uint8_t           flags; /* these match the added_f flags */
96    uint8_t           myflags; /* flags that aren't defined in added_f */
97    uint16_t          port;
98    uint16_t          numFails;
99    struct in_addr    addr;
100    time_t            time; /* the last time the peer's connection status
101                              changed */
102    time_t            piece_data_time;
103};
104
105typedef struct
106{
107    uint8_t         hash[SHA_DIGEST_LENGTH];
108    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
109    tr_ptrArray *   pool; /* struct peer_atom */
110    tr_ptrArray *   peers; /* tr_peer */
111    tr_ptrArray *   webseeds; /* tr_webseed */
112    tr_timer *      reconnectTimer;
113    tr_timer *      rechokeTimer;
114    tr_timer *      refillTimer;
115    tr_torrent *    tor;
116    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
117    tr_bitfield *   requestedPieces;
118
119    unsigned int    isRunning : 1;
120
121    struct tr_peerMgr * manager;
122}
123Torrent;
124
125struct tr_peerMgr
126{
127    uint8_t        bandwidthPulseNumber;
128    tr_session *   session;
129    tr_ptrArray *  torrents; /* Torrent */
130    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
131    tr_timer *     bandwidthTimer;
132    double         rateHistory[2][BANDWIDTH_PULSE_HISTORY];
133    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
134};
135
136#define tordbg( t, fmt... ) \
137    tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ## fmt )
138
139#define dbgmsg( fmt... ) \
140    tr_deepLog( __FILE__, __LINE__, NULL, ## fmt )
141
142/**
143***
144**/
145
146static void
147managerLock( const struct tr_peerMgr * manager )
148{
149    tr_globalLock( manager->session );
150}
151
152static void
153managerUnlock( const struct tr_peerMgr * manager )
154{
155    tr_globalUnlock( manager->session );
156}
157
158static void
159torrentLock( Torrent * torrent )
160{
161    managerLock( torrent->manager );
162}
163
164static void
165torrentUnlock( Torrent * torrent )
166{
167    managerUnlock( torrent->manager );
168}
169
170static int
171torrentIsLocked( const Torrent * t )
172{
173    return tr_globalIsLocked( t->manager->session );
174}
175
176/**
177***
178**/
179
180static int
181compareAddresses( const struct in_addr * a,
182                  const struct in_addr * b )
183{
184    if( a->s_addr != b->s_addr )
185        return a->s_addr < b->s_addr ? -1 : 1;
186
187    return 0;
188}
189
190static int
191handshakeCompareToAddr( const void * va,
192                        const void * vb )
193{
194    const tr_handshake * a = va;
195
196    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
197}
198
199static int
200handshakeCompare( const void * a,
201                  const void * b )
202{
203    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
204}
205
206static tr_handshake*
207getExistingHandshake( tr_ptrArray *          handshakes,
208                      const struct in_addr * in_addr )
209{
210    return tr_ptrArrayFindSorted( handshakes,
211                                  in_addr,
212                                  handshakeCompareToAddr );
213}
214
215static int
216comparePeerAtomToAddress( const void * va,
217                          const void * vb )
218{
219    const struct peer_atom * a = va;
220
221    return compareAddresses( &a->addr, vb );
222}
223
224static int
225comparePeerAtoms( const void * va,
226                  const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static int
238torrentCompare( const void * va,
239                const void * vb )
240{
241    const Torrent * a = va;
242    const Torrent * b = vb;
243
244    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
245}
246
247static int
248torrentCompareToHash( const void * va,
249                      const void * vb )
250{
251    const Torrent * a = va;
252    const uint8_t * b_hash = vb;
253
254    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
255}
256
257static Torrent*
258getExistingTorrent( tr_peerMgr *    manager,
259                    const uint8_t * hash )
260{
261    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
262                                             hash,
263                                             torrentCompareToHash );
264}
265
266static int
267peerCompare( const void * va,
268             const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272
273    return compareAddresses( &a->in_addr, &b->in_addr );
274}
275
276static int
277peerCompareToAddr( const void * va,
278                   const void * vb )
279{
280    const tr_peer * a = va;
281
282    return compareAddresses( &a->in_addr, vb );
283}
284
285static tr_peer*
286getExistingPeer( Torrent *              torrent,
287                 const struct in_addr * in_addr )
288{
289    assert( torrentIsLocked( torrent ) );
290    assert( in_addr );
291
292    return tr_ptrArrayFindSorted( torrent->peers,
293                                  in_addr,
294                                  peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const                  Torrent * t,
299                 const struct in_addr * addr )
300{
301    assert( torrentIsLocked( t ) );
302    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
303}
304
305static int
306peerIsInUse( const Torrent *        ct,
307             const struct in_addr * addr )
308{
309    Torrent * t = (Torrent*) ct;
310
311    assert( torrentIsLocked ( t ) );
312
313    return getExistingPeer( t, addr )
314           || getExistingHandshake( t->outgoingHandshakes, addr )
315           || getExistingHandshake( t->manager->incomingHandshakes, addr );
316}
317
318static tr_peer*
319peerConstructor( const struct in_addr * in_addr )
320{
321    tr_peer * p;
322
323    p = tr_new0( tr_peer, 1 );
324    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent *              torrent,
330         const struct in_addr * in_addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, in_addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( in_addr );
341        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351    assert( peer->msgs );
352
353    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
354    tr_peerMsgsFree( peer->msgs );
355
356    tr_peerIoFree( peer->io );
357
358    tr_bitfieldFree( peer->have );
359    tr_bitfieldFree( peer->blame );
360    tr_free( peer->client );
361    tr_free( peer );
362}
363
364static void
365removePeer( Torrent * t,
366            tr_peer * peer )
367{
368    tr_peer *          removed;
369    struct peer_atom * atom;
370
371    assert( torrentIsLocked( t ) );
372
373    atom = getExistingAtom( t, &peer->in_addr );
374    assert( atom );
375    atom->time = time( NULL );
376
377    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
378    assert( removed == peer );
379    peerDestructor( removed );
380}
381
382static void
383removeAllPeers( Torrent * t )
384{
385    while( !tr_ptrArrayEmpty( t->peers ) )
386        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
387}
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( t->peers );
398    assert( torrentIsLocked( t ) );
399    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
400    assert( tr_ptrArrayEmpty( t->peers ) );
401
402    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
403
404    tr_timerFree( &t->reconnectTimer );
405    tr_timerFree( &t->rechokeTimer );
406    tr_timerFree( &t->refillTimer );
407
408    tr_bitfieldFree( t->requestedPieces );
409    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
410    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
411    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
412    tr_ptrArrayFree( t->peers, NULL );
413
414    tr_free( t );
415}
416
417static void peerCallbackFunc( void * vpeer,
418                              void * vevent,
419                              void * vt );
420
421static Torrent*
422torrentConstructor( tr_peerMgr * manager,
423                    tr_torrent * tor )
424{
425    int       i;
426    Torrent * t;
427
428    t = tr_new0( Torrent, 1 );
429    t->manager = manager;
430    t->tor = tor;
431    t->pool = tr_ptrArrayNew( );
432    t->peers = tr_ptrArrayNew( );
433    t->webseeds = tr_ptrArrayNew( );
434    t->outgoingHandshakes = tr_ptrArrayNew( );
435    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
436    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
437
438    for( i = 0; i < tor->info.webseedCount; ++i )
439    {
440        tr_webseed * w =
441            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
442                           t );
443        tr_ptrArrayAppend( t->webseeds, w );
444    }
445
446    return t;
447}
448
449/**
450 * For explanation, see http://www.bittorrent.org/fast_extensions.html
451 * Also see the "test-allowed-set" unit test
452 *
453 * @param k number of pieces in set
454 * @param sz number of pieces in the torrent
455 * @param infohash torrent's SHA1 hash
456 * @param ip peer's address
457 */
458struct tr_bitfield *
459tr_peerMgrGenerateAllowedSet(
460    const uint32_t         k,
461    const uint32_t         sz,
462    const                  uint8_t        *
463                           infohash,
464    const struct in_addr * ip )
465{
466    uint8_t       w[SHA_DIGEST_LENGTH + 4];
467    uint8_t       x[SHA_DIGEST_LENGTH];
468    tr_bitfield * a;
469    uint32_t      a_size;
470
471    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
472    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
473    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
474
475    a = tr_bitfieldNew( sz );
476    a_size = 0;
477
478    while( a_size < k )
479    {
480        int i;
481        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
482        {
483            uint32_t j = i * 4;                                /* (5) */
484            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
485            uint32_t index = y % sz;                           /* (7) */
486            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
487            {
488                tr_bitfieldAdd( a, index );                    /* (9) */
489                ++a_size;
490            }
491        }
492        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
493    }
494
495    return a;
496}
497
498static int bandwidthPulse( void * vmgr );
499
500tr_peerMgr*
501tr_peerMgrNew( tr_session * session )
502{
503    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
504
505    m->session = session;
506    m->torrents = tr_ptrArrayNew( );
507    m->incomingHandshakes = tr_ptrArrayNew( );
508    m->bandwidthPulseNumber = -1;
509    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
510                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
511    return m;
512}
513
514void
515tr_peerMgrFree( tr_peerMgr * manager )
516{
517    managerLock( manager );
518
519    tr_timerFree( &manager->bandwidthTimer );
520
521    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
522     * the item from manager->handshakes, so this is a little roundabout... */
523    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
524        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
525
526    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
527
528    /* free the torrents. */
529    tr_ptrArrayFree( manager->torrents, torrentDestructor );
530
531    managerUnlock( manager );
532    tr_free( manager );
533}
534
535static tr_peer**
536getConnectedPeers( Torrent * t,
537                   int *     setmeCount )
538{
539    int       i, peerCount, connectionCount;
540    tr_peer **peers;
541    tr_peer **ret;
542
543    assert( torrentIsLocked( t ) );
544
545    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
546    ret = tr_new( tr_peer *, peerCount );
547
548    for( i = connectionCount = 0; i < peerCount; ++i )
549        if( peers[i]->msgs )
550            ret[connectionCount++] = peers[i];
551
552    *setmeCount = connectionCount;
553    return ret;
554}
555
556static int
557clientIsDownloadingFrom( const tr_peer * peer )
558{
559    return peer->clientIsInterested && !peer->clientIsChoked;
560}
561
562static int
563clientIsUploadingTo( const tr_peer * peer )
564{
565    return peer->peerIsInterested && !peer->peerIsChoked;
566}
567
568/***
569****
570***/
571
572int
573tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
574                      const uint8_t *        torrentHash,
575                      const struct in_addr * addr )
576{
577    int                      isSeed = FALSE;
578    const Torrent *          t = NULL;
579    const struct peer_atom * atom = NULL;
580
581    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
582    if( t )
583        atom = getExistingAtom( t, addr );
584    if( atom )
585        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
586
587    return isSeed;
588}
589
590/***
591****  Refill
592***/
593
594struct tr_refill_piece
595{
596    tr_priority_t    priority;
597    int              random;
598    uint32_t         piece;
599    uint32_t         peerCount;
600};
601
602static int
603compareRefillPiece( const void * aIn,
604                    const void * bIn )
605{
606    const struct tr_refill_piece * a = aIn;
607    const struct tr_refill_piece * b = bIn;
608
609    /* if one piece has a higher priority, it goes first */
610    if( a->priority != b->priority )
611        return a->priority > b->priority ? -1 : 1;
612
613    /* otherwise if one has fewer peers, it goes first */
614    if( a->peerCount != b->peerCount )
615        return a->peerCount < b->peerCount ? -1 : 1;
616
617    /* otherwise go with our random seed */
618    if( a->random != b->random )
619        return a->random < b->random ? -1 : 1;
620
621    return 0;
622}
623
624static int
625isPieceInteresting( const tr_torrent * tor,
626                    tr_piece_index_t   piece )
627{
628    if( tor->info.pieces[piece].dnd ) /* we don't want it */
629        return 0;
630
631    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
632        return 0;
633
634    return 1;
635}
636
637static uint32_t*
638getPreferredPieces( Torrent *  t,
639                    uint32_t * pieceCount )
640{
641    const tr_torrent * tor = t->tor;
642    const tr_info *    inf = &tor->info;
643    tr_piece_index_t   i;
644    uint32_t           poolSize = 0;
645    uint32_t *         pool = tr_new( uint32_t, inf->pieceCount );
646    int                peerCount;
647    tr_peer**          peers;
648
649    assert( torrentIsLocked( t ) );
650
651    peers = getConnectedPeers( t, &peerCount );
652
653    for( i = 0; i < inf->pieceCount; ++i )
654        if( isPieceInteresting( tor, i ) )
655            pool[poolSize++] = i;
656
657    /* sort the pool from most interesting to least... */
658    if( poolSize > 1 )
659    {
660        uint32_t                 j;
661        struct tr_refill_piece * p = tr_new( struct tr_refill_piece,
662                                             poolSize );
663
664        for( j = 0; j < poolSize; ++j )
665        {
666            int                      k;
667            const tr_piece_index_t   piece = pool[j];
668            struct tr_refill_piece * setme = p + j;
669
670            setme->piece = piece;
671            setme->priority = inf->pieces[piece].priority;
672            setme->peerCount = 0;
673            setme->random = tr_cryptoWeakRandInt( INT_MAX );
674
675            for( k = 0; k < peerCount; ++k )
676            {
677                const tr_peer * peer = peers[k];
678                if( peer->peerIsInterested && !peer->clientIsChoked
679                  && tr_bitfieldHas( peer->have, piece ) )
680                    ++setme->peerCount;
681            }
682        }
683
684        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
685               compareRefillPiece );
686
687        for( j = 0; j < poolSize; ++j )
688            pool[j] = p[j].piece;
689
690        tr_free( p );
691    }
692
693    tr_free( peers );
694
695    *pieceCount = poolSize;
696    return pool;
697}
698
699static tr_peer**
700getPeersUploadingToClient( Torrent * t,
701                           int *     setmeCount )
702{
703    int        i;
704    int        peerCount = 0;
705    int        retCount = 0;
706    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
707    tr_peer ** ret = tr_new( tr_peer *, peerCount );
708
709    /* get a list of peers we're downloading from */
710    for( i = 0; i < peerCount; ++i )
711        if( clientIsDownloadingFrom( peers[i] ) )
712            ret[retCount++] = peers[i];
713
714    /* pick a different starting point each time so all peers
715     * get a chance at the first blocks in the queue */
716    if( retCount )
717    {
718        tr_peer ** tmp = tr_new( tr_peer *, retCount );
719        i = tr_cryptoWeakRandInt( retCount );
720        memcpy( tmp, ret, sizeof( tr_peer* ) * retCount );
721        memcpy( ret, tmp + i, sizeof( tr_peer* ) * ( retCount - i ) );
722        memcpy( ret + ( retCount - i ), tmp, sizeof( tr_peer* ) * i );
723        tr_free( tmp );
724    }
725
726    *setmeCount = retCount;
727    return ret;
728}
729
730static int
731refillPulse( void * vtorrent )
732{
733    Torrent *        t = vtorrent;
734    tr_torrent *     tor = t->tor;
735    int              peerCount;
736    int              webseedCount;
737    tr_peer **       peers;
738    tr_webseed **    webseeds;
739    uint32_t         pieceCount;
740    uint32_t *       pieces;
741    tr_piece_index_t i;
742
743    if( !t->isRunning )
744        return TRUE;
745    if( tr_torrentIsSeed( t->tor ) )
746        return TRUE;
747
748    torrentLock( t );
749    tordbg( t, "Refilling Request Buffers..." );
750
751    pieces = getPreferredPieces( t, &pieceCount );
752    peers = getPeersUploadingToClient( t, &peerCount );
753    webseedCount = tr_ptrArraySize( t->webseeds );
754    webseeds = tr_memdup( tr_ptrArrayBase(
755                             t->webseeds ), webseedCount *
756                         sizeof( tr_webseed* ) );
757
758    for( i = 0; ( webseedCount || peerCount ) && i < pieceCount; ++i )
759    {
760        int                    j;
761        int                    handled = FALSE;
762        const tr_piece_index_t piece = pieces[i];
763
764        assert( piece < tor->info.pieceCount );
765
766        /* find a peer who can ask for this piece */
767        for( j = 0; !handled && j < peerCount; )
768        {
769            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs,
770                                                           piece );
771            switch( val )
772            {
773                case TR_ADDREQ_FULL:
774                case TR_ADDREQ_CLIENT_CHOKED:
775                    peers[j] = peers[--peerCount];
776                    break;
777
778                case TR_ADDREQ_MISSING:
779                case TR_ADDREQ_DUPLICATE:
780                    ++j;
781                    break;
782
783                case TR_ADDREQ_OK:
784                    tr_bitfieldAdd( t->requestedPieces, piece );
785                    handled = TRUE;
786                    break;
787
788                default:
789                    assert( 0 && "unhandled value" );
790                    break;
791            }
792        }
793
794        /* maybe one of the webseeds can do it */
795        for( j = 0; !handled && j < webseedCount; )
796        {
797            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
798                                                          piece );
799            switch( val )
800            {
801                case TR_ADDREQ_FULL:
802                    webseeds[j] = webseeds[--webseedCount];
803                    break;
804
805                case TR_ADDREQ_OK:
806                    tr_bitfieldAdd( t->requestedPieces, piece );
807                    handled = TRUE;
808                    break;
809
810                default:
811                    assert( 0 && "unhandled value" );
812                    break;
813            }
814        }
815    }
816
817    /* cleanup */
818    tr_free( webseeds );
819    tr_free( peers );
820    tr_free( pieces );
821
822    t->refillTimer = NULL;
823    torrentUnlock( t );
824    return FALSE;
825}
826
827static void
828addStrike( Torrent * t,
829           tr_peer * peer )
830{
831    tordbg( t, "increasing peer %s strike count to %d",
832            tr_peerIoAddrStr( &peer->in_addr,
833                              peer->port ), peer->strikes + 1 );
834
835    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
836    {
837        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
838        atom->myflags |= MYFLAG_BANNED;
839        peer->doPurge = 1;
840        tordbg( t, "banning peer %s",
841               tr_peerIoAddrStr( &atom->addr, atom->port ) );
842    }
843}
844
845static void
846gotBadPiece( Torrent *        t,
847             tr_piece_index_t pieceIndex )
848{
849    tr_torrent *   tor = t->tor;
850    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
851
852    tor->corruptCur += byteCount;
853    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
854}
855
856static void
857refillSoon( Torrent * t )
858{
859    if( t->refillTimer == NULL )
860        t->refillTimer = tr_timerNew( t->manager->session,
861                                      refillPulse, t,
862                                      REFILL_PERIOD_MSEC );
863}
864
865static void
866peerCallbackFunc( void * vpeer,
867                  void * vevent,
868                  void * vt )
869{
870    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
871    Torrent *             t = (Torrent *) vt;
872    const tr_peer_event * e = vevent;
873
874    torrentLock( t );
875
876    switch( e->eventType )
877    {
878        case TR_PEER_NEED_REQ:
879            refillSoon( t );
880            break;
881
882        case TR_PEER_CANCEL:
883            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
884            break;
885
886        case TR_PEER_PEER_GOT_DATA:
887        {
888            const time_t now = time( NULL );
889            tr_torrent * tor = t->tor;
890            tor->activityDate = now;
891            tor->uploadedCur += e->length;
892            tr_statsAddUploaded( tor->session, e->length );
893            if( peer )
894            {
895                struct peer_atom * atom = getExistingAtom( t,
896                                                           &peer->in_addr );
897                atom->piece_data_time = time( NULL );
898            }
899            break;
900        }
901
902        case TR_PEER_CLIENT_GOT_DATA:
903        {
904            const time_t now = time( NULL );
905            tr_torrent * tor = t->tor;
906            tor->activityDate = now;
907            /* only add this to downloadedCur if we got it from a peer --
908             * webseeds shouldn't count against our ratio.  As one tracker
909             * admin put it, "Those pieces are downloaded directly from the
910             * content distributor, not the peers, it is the tracker's job
911             * to manage the swarms, not the web server and does not fit
912             * into the jurisdiction of the tracker." */
913            if( peer )
914                tor->downloadedCur += e->length;
915            tr_statsAddDownloaded( tor->session, e->length );
916            if( peer )
917            {
918                struct peer_atom * atom = getExistingAtom( t,
919                                                           &peer->in_addr );
920                atom->piece_data_time = time( NULL );
921            }
922            break;
923        }
924
925        case TR_PEER_PEER_PROGRESS:
926        {
927            if( peer )
928            {
929                struct peer_atom * atom = getExistingAtom( t,
930                                                           &peer->in_addr );
931                const int          peerIsSeed = e->progress >= 1.0;
932                if( peerIsSeed )
933                {
934                    tordbg( t, "marking peer %s as a seed",
935                           tr_peerIoAddrStr( &atom->addr,
936                                             atom->port ) );
937                    atom->flags |= ADDED_F_SEED_FLAG;
938                }
939                else
940                {
941                    tordbg( t, "marking peer %s as a non-seed",
942                           tr_peerIoAddrStr( &atom->addr,
943                                             atom->port ) );
944                    atom->flags &= ~ADDED_F_SEED_FLAG;
945                }
946            }
947            break;
948        }
949
950        case TR_PEER_CLIENT_GOT_BLOCK:
951        {
952            tr_torrent *     tor = t->tor;
953
954            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
955                                                e->offset );
956
957            tr_cpBlockAdd( tor->completion, block );
958
959            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
960            {
961                const tr_piece_index_t p = e->pieceIndex;
962                const int              ok = tr_ioTestPiece( tor, p );
963
964                if( !ok )
965                {
966                    tr_torerr( tor,
967                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
968                              (unsigned long)p );
969                }
970
971                tr_torrentSetHasPiece( tor, p, ok );
972                tr_torrentSetPieceChecked( tor, p, TRUE );
973                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
974
975                if( !ok )
976                    gotBadPiece( t, p );
977                else
978                {
979                    int        i, peerCount;
980                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
981                    for( i = 0; i < peerCount; ++i )
982                        tr_peerMsgsHave( peers[i]->msgs, p );
983                    tr_free( peers );
984                }
985
986                tr_torrentRecheckCompleteness( tor );
987            }
988            break;
989        }
990
991        case TR_PEER_ERROR:
992            if( e->err == EINVAL )
993            {
994                addStrike( t, peer );
995                peer->doPurge = 1;
996            }
997            else if( ( e->err == EPROTO )
998                  || ( e->err == ERANGE )
999                  || ( e->err == EMSGSIZE )
1000                  || ( e->err == ENOTCONN ) )
1001            {
1002                /* some protocol error from the peer */
1003                peer->doPurge = 1;
1004            }
1005            else /* a local error, such as an IO error */
1006            {
1007                t->tor->error = e->err;
1008                tr_strlcpy( t->tor->errorString,
1009                            tr_strerror( t->tor->error ),
1010                            sizeof( t->tor->errorString ) );
1011                tr_torrentStop( t->tor );
1012            }
1013            break;
1014
1015        default:
1016            assert( 0 );
1017    }
1018
1019    torrentUnlock( t );
1020}
1021
1022static void
1023ensureAtomExists( Torrent *              t,
1024                  const struct in_addr * addr,
1025                  uint16_t               port,
1026                  uint8_t                flags,
1027                  uint8_t                from )
1028{
1029    if( getExistingAtom( t, addr ) == NULL )
1030    {
1031        struct peer_atom * a;
1032        a = tr_new0( struct peer_atom, 1 );
1033        a->addr = *addr;
1034        a->port = port;
1035        a->flags = flags;
1036        a->from = from;
1037        tordbg( t, "got a new atom: %s",
1038               tr_peerIoAddrStr( &a->addr, a->port ) );
1039        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1040    }
1041}
1042
1043static int
1044getMaxPeerCount( const tr_torrent * tor )
1045{
1046    return tor->maxConnectedPeers;
1047}
1048
1049static int
1050getPeerCount( const Torrent * t )
1051{
1052    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1053               t->outgoingHandshakes );
1054}
1055
1056/* FIXME: this is kind of a mess. */
1057static int
1058myHandshakeDoneCB( tr_handshake *  handshake,
1059                   tr_peerIo *     io,
1060                   int             isConnected,
1061                   const uint8_t * peer_id,
1062                   void *          vmanager )
1063{
1064    int                    ok = isConnected;
1065    int                    success = FALSE;
1066    uint16_t               port;
1067    const struct in_addr * addr;
1068    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1069    Torrent *              t;
1070    tr_handshake *         ours;
1071
1072    assert( io );
1073    assert( isConnected == 0 || isConnected == 1 );
1074
1075    t = tr_peerIoHasTorrentHash( io )
1076        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1077        : NULL;
1078
1079    if( tr_peerIoIsIncoming ( io ) )
1080        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1081                                        handshake, handshakeCompare );
1082    else if( t )
1083        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1084                                        handshake, handshakeCompare );
1085    else
1086        ours = handshake;
1087
1088    assert( ours );
1089    assert( ours == handshake );
1090
1091    if( t )
1092        torrentLock( t );
1093
1094    addr = tr_peerIoGetAddress( io, &port );
1095
1096    if( !ok || !t || !t->isRunning )
1097    {
1098        if( t )
1099        {
1100            struct peer_atom * atom = getExistingAtom( t, addr );
1101            if( atom )
1102                ++atom->numFails;
1103        }
1104
1105        tr_peerIoFree( io );
1106    }
1107    else /* looking good */
1108    {
1109        struct peer_atom * atom;
1110        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1111        atom = getExistingAtom( t, addr );
1112        atom->time = time( NULL );
1113
1114        if( atom->myflags & MYFLAG_BANNED )
1115        {
1116            tordbg( t, "banned peer %s tried to reconnect",
1117                   tr_peerIoAddrStr( &atom->addr,
1118                                     atom->port ) );
1119            tr_peerIoFree( io );
1120        }
1121        else if( tr_peerIoIsIncoming( io )
1122               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1123
1124        {
1125            tr_peerIoFree( io );
1126        }
1127        else
1128        {
1129            tr_peer * peer = getExistingPeer( t, addr );
1130
1131            if( peer ) /* we already have this peer */
1132            {
1133                tr_peerIoFree( io );
1134            }
1135            else
1136            {
1137                peer = getPeer( t, addr );
1138                tr_free( peer->client );
1139
1140                if( !peer_id )
1141                    peer->client = NULL;
1142                else
1143                {
1144                    char client[128];
1145                    tr_clientForId( client, sizeof( client ), peer_id );
1146                    peer->client = tr_strdup( client );
1147                }
1148                peer->port = port;
1149                peer->io = io;
1150                peer->msgs =
1151                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1152                                    &peer->msgsTag );
1153
1154                success = TRUE;
1155            }
1156        }
1157    }
1158
1159    if( t )
1160        torrentUnlock( t );
1161
1162    return success;
1163}
1164
1165void
1166tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1167                       struct in_addr * addr,
1168                       uint16_t         port,
1169                       int              socket )
1170{
1171    managerLock( manager );
1172
1173    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1174    {
1175        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1176               inet_ntoa( *addr ) );
1177        tr_netClose( socket );
1178    }
1179    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1180    {
1181        tr_netClose( socket );
1182    }
1183    else /* we don't have a connetion to them yet... */
1184    {
1185        tr_peerIo *    io;
1186        tr_handshake * handshake;
1187
1188        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1189
1190        handshake = tr_handshakeNew( io,
1191                                     manager->session->encryptionMode,
1192                                     myHandshakeDoneCB,
1193                                     manager );
1194
1195        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1196                                 handshakeCompare );
1197    }
1198
1199    managerUnlock( manager );
1200}
1201
1202void
1203tr_peerMgrAddPex( tr_peerMgr *    manager,
1204                  const uint8_t * torrentHash,
1205                  uint8_t         from,
1206                  const tr_pex *  pex )
1207{
1208    Torrent * t;
1209
1210    managerLock( manager );
1211
1212    t = getExistingTorrent( manager, torrentHash );
1213    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1214        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1215
1216    managerUnlock( manager );
1217}
1218
1219tr_pex *
1220tr_peerMgrCompactToPex( const void *    compact,
1221                        size_t          compactLen,
1222                        const uint8_t * added_f,
1223                        size_t          added_f_len,
1224                        size_t *        pexCount )
1225{
1226    size_t          i;
1227    size_t          n = compactLen / 6;
1228    const uint8_t * walk = compact;
1229    tr_pex *        pex = tr_new0( tr_pex, n );
1230
1231    for( i = 0; i < n; ++i )
1232    {
1233        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1234        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1235        if( added_f && ( n == added_f_len ) )
1236            pex[i].flags = added_f[i];
1237    }
1238
1239    *pexCount = n;
1240    return pex;
1241}
1242
1243/**
1244***
1245**/
1246
1247void
1248tr_peerMgrSetBlame( tr_peerMgr *     manager,
1249                    const uint8_t *  torrentHash,
1250                    tr_piece_index_t pieceIndex,
1251                    int              success )
1252{
1253    if( !success )
1254    {
1255        int        peerCount, i;
1256        Torrent *  t = getExistingTorrent( manager, torrentHash );
1257        tr_peer ** peers;
1258
1259        assert( torrentIsLocked( t ) );
1260
1261        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1262        for( i = 0; i < peerCount; ++i )
1263        {
1264            tr_peer * peer = peers[i];
1265            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1266            {
1267                tordbg(
1268                    t,
1269                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1270                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1271                    pieceIndex, (int)peer->strikes + 1 );
1272                addStrike( t, peer );
1273            }
1274        }
1275    }
1276}
1277
1278int
1279tr_pexCompare( const void * va,
1280               const void * vb )
1281{
1282    const tr_pex * a = va;
1283    const tr_pex * b = vb;
1284    int            i =
1285        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1286
1287    if( i ) return i;
1288    if( a->port < b->port ) return -1;
1289    if( a->port > b->port ) return 1;
1290    return 0;
1291}
1292
1293int tr_pexCompare( const void * a,
1294                   const void * b );
1295
1296static int
1297peerPrefersCrypto( const tr_peer * peer )
1298{
1299    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1300        return TRUE;
1301
1302    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1303        return FALSE;
1304
1305    return tr_peerIoIsEncrypted( peer->io );
1306}
1307
1308int
1309tr_peerMgrGetPeers( tr_peerMgr *    manager,
1310                    const uint8_t * torrentHash,
1311                    tr_pex **       setme_pex )
1312{
1313    const Torrent *  t = getExistingTorrent( (tr_peerMgr*)manager,
1314                                            torrentHash );
1315    int              i, peerCount;
1316    const tr_peer ** peers;
1317    tr_pex *         pex;
1318    tr_pex *         walk;
1319
1320    managerLock( manager );
1321
1322    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1323    pex = walk = tr_new( tr_pex, peerCount );
1324
1325    for( i = 0; i < peerCount; ++i, ++walk )
1326    {
1327        const tr_peer * peer = peers[i];
1328
1329        walk->in_addr = peer->in_addr;
1330
1331        walk->port = peer->port;
1332
1333        walk->flags = 0;
1334        if( peerPrefersCrypto( peer ) ) walk->flags |=
1335                ADDED_F_ENCRYPTION_FLAG;
1336        if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1337    }
1338
1339    assert( ( walk - pex ) == peerCount );
1340    qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1341    *setme_pex = pex;
1342
1343    managerUnlock( manager );
1344
1345    return peerCount;
1346}
1347
1348static int reconnectPulse( void * vtorrent );
1349
1350static int rechokePulse( void * vtorrent );
1351
1352void
1353tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1354                        const uint8_t * torrentHash )
1355{
1356    Torrent * t;
1357
1358    managerLock( manager );
1359
1360    t = getExistingTorrent( manager, torrentHash );
1361
1362    assert( t );
1363    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1364    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1365
1366    if( !t->isRunning )
1367    {
1368        t->isRunning = 1;
1369
1370        t->reconnectTimer = tr_timerNew( t->manager->session,
1371                                         reconnectPulse, t,
1372                                         RECONNECT_PERIOD_MSEC );
1373
1374        t->rechokeTimer = tr_timerNew( t->manager->session,
1375                                       rechokePulse, t,
1376                                       RECHOKE_PERIOD_MSEC );
1377
1378        reconnectPulse( t );
1379
1380        rechokePulse( t );
1381
1382        if( !tr_ptrArrayEmpty( t->webseeds ) )
1383            refillSoon( t );
1384    }
1385
1386    managerUnlock( manager );
1387}
1388
1389static void
1390stopTorrent( Torrent * t )
1391{
1392    assert( torrentIsLocked( t ) );
1393
1394    t->isRunning = 0;
1395    tr_timerFree( &t->rechokeTimer );
1396    tr_timerFree( &t->reconnectTimer );
1397
1398    /* disconnect the peers. */
1399    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1400    tr_ptrArrayClear( t->peers );
1401
1402    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1403     * which removes the handshake from t->outgoingHandshakes... */
1404    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1405        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1406}
1407
1408void
1409tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1410                       const uint8_t * torrentHash )
1411{
1412    managerLock( manager );
1413
1414    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1415
1416    managerUnlock( manager );
1417}
1418
1419void
1420tr_peerMgrAddTorrent( tr_peerMgr * manager,
1421                      tr_torrent * tor )
1422{
1423    Torrent * t;
1424
1425    managerLock( manager );
1426
1427    assert( tor );
1428    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1429
1430    t = torrentConstructor( manager, tor );
1431    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1432
1433    managerUnlock( manager );
1434}
1435
1436void
1437tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1438                         const uint8_t * torrentHash )
1439{
1440    Torrent * t;
1441
1442    managerLock( manager );
1443
1444    t = getExistingTorrent( manager, torrentHash );
1445    assert( t );
1446    stopTorrent( t );
1447    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1448    torrentDestructor( t );
1449
1450    managerUnlock( manager );
1451}
1452
1453void
1454tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1455                               const uint8_t *    torrentHash,
1456                               int8_t *           tab,
1457                               unsigned int       tabCount )
1458{
1459    tr_piece_index_t   i;
1460    const Torrent *    t;
1461    const tr_torrent * tor;
1462    float              interval;
1463    int                isComplete;
1464    int                peerCount;
1465    const tr_peer **   peers;
1466
1467    managerLock( manager );
1468
1469    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1470    tor = t->tor;
1471    interval = tor->info.pieceCount / (float)tabCount;
1472    isComplete = tor
1473                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1474    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1475
1476    memset( tab, 0, tabCount );
1477
1478    for( i = 0; tor && i < tabCount; ++i )
1479    {
1480        const int piece = i * interval;
1481
1482        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1483            tab[i] = -1;
1484        else if( peerCount )
1485        {
1486            int j;
1487            for( j = 0; j < peerCount; ++j )
1488                if( tr_bitfieldHas( peers[j]->have, i ) )
1489                    ++tab[i];
1490        }
1491    }
1492
1493    managerUnlock( manager );
1494}
1495
1496/* Returns the pieces that are available from peers */
1497tr_bitfield*
1498tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1499                        const uint8_t *    torrentHash )
1500{
1501    int           i, size;
1502    Torrent *     t;
1503    tr_peer **    peers;
1504    tr_bitfield * pieces;
1505
1506    managerLock( manager );
1507
1508    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1509    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1510    peers = getConnectedPeers( t, &size );
1511    for( i = 0; i < size; ++i )
1512        tr_bitfieldOr( pieces, peers[i]->have );
1513
1514    managerUnlock( manager );
1515    tr_free( peers );
1516    return pieces;
1517}
1518
1519int
1520tr_peerMgrHasConnections( const tr_peerMgr * manager,
1521                          const uint8_t *    torrentHash )
1522{
1523    int             ret;
1524    const Torrent * t;
1525
1526    managerLock( manager );
1527
1528    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1529    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1530               || !tr_ptrArrayEmpty( t->webseeds ) );
1531
1532    managerUnlock( manager );
1533    return ret;
1534}
1535
1536void
1537tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1538                        const uint8_t *    torrentHash,
1539                        int *              setmePeersKnown,
1540                        int *              setmePeersConnected,
1541                        int *              setmeSeedsConnected,
1542                        int *              setmeWebseedsSendingToUs,
1543                        int *              setmePeersSendingToUs,
1544                        int *              setmePeersGettingFromUs,
1545                        int *              setmePeersFrom,
1546                        double *           setmeRateToClient,
1547                        double *           setmeRateToPeers )
1548{
1549    int                 i, size;
1550    const Torrent *     t;
1551    const tr_peer **    peers;
1552    const tr_webseed ** webseeds;
1553
1554    managerLock( manager );
1555
1556    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1557    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1558
1559    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1560    *setmePeersConnected       = 0;
1561    *setmeSeedsConnected       = 0;
1562    *setmePeersGettingFromUs   = 0;
1563    *setmePeersSendingToUs     = 0;
1564    *setmeWebseedsSendingToUs  = 0;
1565    *setmeRateToClient         = 0;
1566    *setmeRateToPeers          = 0;
1567
1568    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1569        setmePeersFrom[i] = 0;
1570
1571    for( i = 0; i < size; ++i )
1572    {
1573        const tr_peer *          peer = peers[i];
1574        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1575
1576        if( peer->io == NULL ) /* not connected */
1577            continue;
1578
1579        ++ * setmePeersConnected;
1580
1581        ++setmePeersFrom[atom->from];
1582
1583        if( clientIsDownloadingFrom( peer ) )
1584            ++ * setmePeersSendingToUs;
1585
1586        if( clientIsUploadingTo( peer ) )
1587            ++ * setmePeersGettingFromUs;
1588
1589        if( atom->flags & ADDED_F_SEED_FLAG )
1590            ++ * setmeSeedsConnected;
1591
1592        *setmeRateToClient += tr_peerIoGetRateToClient( peer->io );
1593
1594        *setmeRateToPeers += tr_peerIoGetRateToPeer( peer->io );
1595    }
1596
1597    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1598    for( i = 0; i < size; ++i )
1599    {
1600        if( tr_webseedIsActive( webseeds[i] ) )
1601            ++ * setmeWebseedsSendingToUs;
1602    }
1603
1604    managerUnlock( manager );
1605}
1606
1607double
1608tr_peerMgrGetRate( const tr_peerMgr * manager,
1609                   tr_direction       direction )
1610{
1611    int    i;
1612    double bytes = 0;
1613
1614    assert( manager != NULL );
1615    assert( direction == TR_UP || direction == TR_DOWN );
1616
1617    for( i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
1618        bytes += manager->rateHistory[direction][i];
1619
1620    return ( BANDWIDTH_PULSES_PER_SECOND * bytes )
1621           / ( BANDWIDTH_PULSE_HISTORY * 1024 );
1622}
1623
1624float*
1625tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1626                     const uint8_t *    torrentHash )
1627{
1628    const Torrent *     t;
1629    const tr_webseed ** webseeds;
1630    int                 i;
1631    int                 webseedCount;
1632    float *             ret;
1633
1634    assert( manager );
1635    managerLock( manager );
1636
1637    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1638    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1639                                                     &webseedCount );
1640    assert( webseedCount == t->tor->info.webseedCount );
1641    ret = tr_new0( float, webseedCount );
1642
1643    for( i = 0; i < webseedCount; ++i )
1644        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1645            ret[i] = -1.0;
1646
1647    managerUnlock( manager );
1648    return ret;
1649}
1650
1651struct tr_peer_stat *
1652tr_peerMgrPeerStats(
1653    const                          tr_peerMgr  * manager,
1654    const                          uint8_t     *
1655                                   torrentHash,
1656    int               * setmeCount UNUSED )
1657{
1658    int             i, size;
1659    const Torrent * t;
1660    tr_peer **      peers;
1661    tr_peer_stat *  ret;
1662
1663    assert( manager );
1664    managerLock( manager );
1665
1666    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1667    peers = getConnectedPeers( (Torrent*)t, &size );
1668    ret = tr_new0( tr_peer_stat, size );
1669
1670    for( i = 0; i < size; ++i )
1671    {
1672        char *                   pch;
1673        const tr_peer *          peer = peers[i];
1674        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1675        tr_peer_stat *           stat = ret + i;
1676
1677        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1678        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1679                   sizeof( stat->client ) );
1680        stat->port               = peer->port;
1681        stat->from               = atom->from;
1682        stat->progress           = peer->progress;
1683        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1684        stat->rateToPeer         = tr_peerIoGetRateToPeer( peer->io );
1685        stat->rateToClient       = tr_peerIoGetRateToClient( peer->io );
1686        stat->peerIsChoked       = peer->peerIsChoked;
1687        stat->peerIsInterested   = peer->peerIsInterested;
1688        stat->clientIsChoked     = peer->clientIsChoked;
1689        stat->clientIsInterested = peer->clientIsInterested;
1690        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1691        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1692        stat->isUploadingTo      = clientIsUploadingTo( peer );
1693
1694        pch = stat->flagStr;
1695        if( t->optimistic == peer ) *pch++ = 'O';
1696        if( stat->isDownloadingFrom ) *pch++ = 'D';
1697        else if( stat->clientIsInterested ) *pch++ = 'd';
1698        if( stat->isUploadingTo ) *pch++ = 'U';
1699        else if( stat->peerIsInterested ) *pch++ = 'u';
1700        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1701                'K';
1702        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1703        if( stat->isEncrypted ) *pch++ = 'E';
1704        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1705        if( stat->isIncoming ) *pch++ = 'I';
1706        *pch = '\0';
1707    }
1708
1709    *setmeCount = size;
1710    tr_free( peers );
1711
1712    managerUnlock( manager );
1713    return ret;
1714}
1715
1716/**
1717***
1718**/
1719
1720struct ChokeData
1721{
1722    unsigned int    doUnchoke    : 1;
1723    unsigned int    isInterested : 1;
1724    double          rateToClient;
1725    double          rateToPeer;
1726    tr_peer *       peer;
1727};
1728
1729static int
1730tr_compareDouble( double a,
1731                  double b )
1732{
1733    if( a < b ) return -1;
1734    if( a > b ) return 1;
1735    return 0;
1736}
1737
1738static int
1739compareChoke( const void * va,
1740              const void * vb )
1741{
1742    const struct ChokeData * a = va;
1743    const struct ChokeData * b = vb;
1744    int                      diff = 0;
1745
1746    if( diff == 0 ) /* prefer higher dl speeds */
1747        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1748    if( diff == 0 ) /* prefer higher ul speeds */
1749        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1750    if( diff == 0 ) /* prefer unchoked */
1751        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1752
1753    return diff;
1754}
1755
1756static int
1757isNew( const tr_peer * peer )
1758{
1759    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1760}
1761
1762static int
1763isSame( const tr_peer * peer )
1764{
1765    return peer && peer->client && strstr( peer->client, "Transmission" );
1766}
1767
1768/**
1769***
1770**/
1771
1772static void
1773rechoke( Torrent * t )
1774{
1775    int                i, peerCount, size, unchokedInterested;
1776    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1777    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1778
1779    assert( torrentIsLocked( t ) );
1780
1781    /* sort the peers by preference and rate */
1782    for( i = 0, size = 0; i < peerCount; ++i )
1783    {
1784        tr_peer * peer = peers[i];
1785        if( peer->progress >= 1.0 ) /* choke all seeds */
1786            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1787        else
1788        {
1789            struct ChokeData * node = &choke[size++];
1790            node->peer = peer;
1791            node->isInterested = peer->peerIsInterested;
1792            node->rateToClient = tr_peerIoGetRateToClient( peer->io );
1793            node->rateToPeer = tr_peerIoGetRateToPeer( peer->io );
1794        }
1795    }
1796
1797    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1798
1799    /**
1800     * Reciprocation and number of uploads capping is managed by unchoking
1801     * the N peers which have the best upload rate and are interested.
1802     * This maximizes the client's download rate. These N peers are
1803     * referred to as downloaders, because they are interested in downloading
1804     * from the client.
1805     *
1806     * Peers which have a better upload rate (as compared to the downloaders)
1807     * but aren't interested get unchoked. If they become interested, the
1808     * downloader with the worst upload rate gets choked. If a client has
1809     * a complete file, it uses its upload rate rather than its download
1810     * rate to decide which peers to unchoke.
1811     */
1812    unchokedInterested = 0;
1813    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1814    {
1815        choke[i].doUnchoke = 1;
1816        if( choke[i].isInterested )
1817            ++unchokedInterested;
1818    }
1819
1820    /* optimistic unchoke */
1821    if( i < size )
1822    {
1823        int                n;
1824        struct ChokeData * c;
1825        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1826
1827        for( ; i < size; ++i )
1828        {
1829            if( choke[i].isInterested )
1830            {
1831                const tr_peer * peer = choke[i].peer;
1832                int             x = 1, y;
1833                if( isNew( peer ) ) x *= 3;
1834                if( isSame( peer ) ) x *= 3;
1835                for( y = 0; y < x; ++y )
1836                    tr_ptrArrayAppend( randPool, &choke[i] );
1837            }
1838        }
1839
1840        if( ( n = tr_ptrArraySize( randPool ) ) )
1841        {
1842            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1843            c->doUnchoke = 1;
1844            t->optimistic = c->peer;
1845        }
1846
1847        tr_ptrArrayFree( randPool, NULL );
1848    }
1849
1850    for( i = 0; i < size; ++i )
1851        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1852
1853    /* cleanup */
1854    tr_free( choke );
1855    tr_free( peers );
1856}
1857
1858static int
1859rechokePulse( void * vtorrent )
1860{
1861    Torrent * t = vtorrent;
1862
1863    torrentLock( t );
1864    rechoke( t );
1865    torrentUnlock( t );
1866    return TRUE;
1867}
1868
1869/***
1870****
1871****  Life and Death
1872****
1873***/
1874
1875static int
1876shouldPeerBeClosed( const Torrent * t,
1877                    const tr_peer * peer,
1878                    int             peerCount )
1879{
1880    const tr_torrent *       tor = t->tor;
1881    const time_t             now = time( NULL );
1882    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1883
1884    /* if it's marked for purging, close it */
1885    if( peer->doPurge )
1886    {
1887        tordbg( t, "purging peer %s because its doPurge flag is set",
1888               tr_peerIoAddrStr( &atom->addr,
1889                                 atom->port ) );
1890        return TRUE;
1891    }
1892
1893    /* if we're seeding and the peer has everything we have,
1894     * and enough time has passed for a pex exchange, then disconnect */
1895    if( tr_torrentIsSeed( tor ) )
1896    {
1897        int peerHasEverything;
1898        if( atom->flags & ADDED_F_SEED_FLAG )
1899            peerHasEverything = TRUE;
1900        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1901            peerHasEverything = FALSE;
1902        else
1903        {
1904            tr_bitfield * tmp =
1905                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1906            tr_bitfieldDifference( tmp, peer->have );
1907            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1908            tr_bitfieldFree( tmp );
1909        }
1910        if( peerHasEverything
1911          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
1912        {
1913            tordbg( t, "purging peer %s because we're both seeds",
1914                   tr_peerIoAddrStr( &atom->addr,
1915                                     atom->port ) );
1916            return TRUE;
1917        }
1918    }
1919
1920    /* disconnect if it's been too long since piece data has been transferred.
1921     * this is on a sliding scale based on number of available peers... */
1922    {
1923        const int    relaxStrictnessIfFewerThanN =
1924            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
1925        /* if we have >= relaxIfFewerThan, strictness is 100%.
1926         * if we have zero connections, strictness is 0% */
1927        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
1928                                  ? 1.0
1929                                  : peerCount /
1930                                  (float)relaxStrictnessIfFewerThanN;
1931        const int    lo = MIN_UPLOAD_IDLE_SECS;
1932        const int    hi = MAX_UPLOAD_IDLE_SECS;
1933        const int    limit = lo + ( ( hi - lo ) * strictness );
1934        const time_t then = peer->pieceDataActivityDate;
1935        const int    idleTime = then ? ( now - then ) : 0;
1936        if( idleTime > limit )
1937        {
1938            tordbg(
1939                t,
1940                "purging peer %s because it's been %d secs since we shared anything",
1941                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
1942            return TRUE;
1943        }
1944    }
1945
1946    return FALSE;
1947}
1948
1949static tr_peer **
1950getPeersToClose( Torrent * t,
1951                 int *     setmeSize )
1952{
1953    int               i, peerCount, outsize;
1954    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
1955                                                           &peerCount );
1956    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
1957
1958    assert( torrentIsLocked( t ) );
1959
1960    for( i = outsize = 0; i < peerCount; ++i )
1961        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1962            ret[outsize++] = peers[i];
1963
1964    *setmeSize = outsize;
1965    return ret;
1966}
1967
1968static int
1969compareCandidates( const void * va,
1970                   const void * vb )
1971{
1972    const struct peer_atom * a = *(const struct peer_atom**) va;
1973    const struct peer_atom * b = *(const struct peer_atom**) vb;
1974
1975    /* <Charles> Here we would probably want to try reconnecting to
1976     * peers that had most recently given us data. Lots of users have
1977     * trouble with resets due to their routers and/or ISPs. This way we
1978     * can quickly recover from an unwanted reset. So we sort
1979     * piece_data_time in descending order.
1980     */
1981
1982    if( a->piece_data_time != b->piece_data_time )
1983        return a->piece_data_time < b->piece_data_time ? 1 : -1;
1984
1985    if( a->numFails != b->numFails )
1986        return a->numFails < b->numFails ? -1 : 1;
1987
1988    if( a->time != b->time )
1989        return a->time < b->time ? -1 : 1;
1990
1991    return 0;
1992}
1993
1994static int
1995getReconnectIntervalSecs( const struct peer_atom * atom )
1996{
1997    int          sec;
1998    const time_t now = time( NULL );
1999
2000    /* if we were recently connected to this peer and transferring piece
2001     * data, try to reconnect to them sooner rather that later -- we don't
2002     * want network troubles to get in the way of a good peer. */
2003    if( ( now - atom->piece_data_time ) <=
2004       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2005        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2006
2007    /* don't allow reconnects more often than our minimum */
2008    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2009        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2010
2011    /* otherwise, the interval depends on how many times we've tried
2012     * and failed to connect to the peer */
2013    else switch( atom->numFails )
2014        {
2015            case 0:
2016                sec = 0; break;
2017
2018            case 1:
2019                sec = 5; break;
2020
2021            case 2:
2022                sec = 2 * 60; break;
2023
2024            case 3:
2025                sec = 15 * 60; break;
2026
2027            case 4:
2028                sec = 30 * 60; break;
2029
2030            case 5:
2031                sec = 60 * 60; break;
2032
2033            default:
2034                sec = 120 * 60; break;
2035        }
2036
2037    return sec;
2038}
2039
2040static struct peer_atom **
2041getPeerCandidates(                               Torrent * t,
2042                                           int * setmeSize )
2043{
2044    int                 i, atomCount, retCount;
2045    struct peer_atom ** atoms;
2046    struct peer_atom ** ret;
2047    const time_t        now = time( NULL );
2048    const int           seed = tr_torrentIsSeed( t->tor );
2049
2050    assert( torrentIsLocked( t ) );
2051
2052    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2053    ret = tr_new( struct peer_atom*, atomCount );
2054    for( i = retCount = 0; i < atomCount; ++i )
2055    {
2056        int                interval;
2057        struct peer_atom * atom = atoms[i];
2058
2059        /* peer fed us too much bad data ... we only keep it around
2060         * now to weed it out in case someone sends it to us via pex */
2061        if( atom->myflags & MYFLAG_BANNED )
2062            continue;
2063
2064        /* peer was unconnectable before, so we're not going to keep trying.
2065         * this is needs a separate flag from `banned', since if they try
2066         * to connect to us later, we'll let them in */
2067        if( atom->myflags & MYFLAG_UNREACHABLE )
2068            continue;
2069
2070        /* we don't need two connections to the same peer... */
2071        if( peerIsInUse( t, &atom->addr ) )
2072            continue;
2073
2074        /* no need to connect if we're both seeds... */
2075        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2076            continue;
2077
2078        /* don't reconnect too often */
2079        interval = getReconnectIntervalSecs( atom );
2080        if( ( now - atom->time ) < interval )
2081        {
2082            tordbg(
2083                t,
2084                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2085                i, tr_peerIoAddrStr( &atom->addr,
2086                                     atom->port ), interval );
2087            continue;
2088        }
2089
2090        /* Don't connect to peers in our blocklist */
2091        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2092            continue;
2093
2094        ret[retCount++] = atom;
2095    }
2096
2097    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2098    *setmeSize = retCount;
2099    return ret;
2100}
2101
2102static int
2103reconnectPulse( void * vtorrent )
2104{
2105    Torrent *     t = vtorrent;
2106    static time_t prevTime = 0;
2107    static int    newConnectionsThisSecond = 0;
2108    time_t        now;
2109
2110    torrentLock( t );
2111
2112    now = time( NULL );
2113    if( prevTime != now )
2114    {
2115        prevTime = now;
2116        newConnectionsThisSecond = 0;
2117    }
2118
2119    if( !t->isRunning )
2120    {
2121        removeAllPeers( t );
2122    }
2123    else
2124    {
2125        int                 i, nCandidates, nBad;
2126        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2127        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2128
2129        if( nBad || nCandidates )
2130            tordbg(
2131                t, "reconnect pulse for [%s]: %d bad connections, "
2132                   "%d connection candidates, %d atoms, max per pulse is %d",
2133                t->tor->info.name, nBad, nCandidates,
2134                tr_ptrArraySize( t->pool ),
2135                (int)MAX_RECONNECTIONS_PER_PULSE );
2136
2137        /* disconnect some peers.
2138           if we transferred piece data, then they might be good peers,
2139           so reset their `numFails' weight to zero.  otherwise we connected
2140           to them fruitlessly, so mark it as another fail */
2141        for( i = 0; i < nBad; ++i )
2142        {
2143            tr_peer *          peer = connections[i];
2144            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2145            if( peer->pieceDataActivityDate )
2146                atom->numFails = 0;
2147            else
2148                ++atom->numFails;
2149            tordbg( t, "removing bad peer %s",
2150                   tr_peerIoGetAddrStr( peer->io ) );
2151            removePeer( t, peer );
2152        }
2153
2154        /* add some new ones */
2155        for( i = 0;    ( i < nCandidates )
2156           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2157           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2158           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2159             ++i )
2160        {
2161            tr_peerMgr *       mgr = t->manager;
2162            struct peer_atom * atom = candidates[i];
2163            tr_peerIo *        io;
2164
2165            tordbg( t, "Starting an OUTGOING connection with %s",
2166                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2167
2168            io =
2169                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2170                                      t->hash );
2171            if( io == NULL )
2172            {
2173                atom->myflags |= MYFLAG_UNREACHABLE;
2174            }
2175            else
2176            {
2177                tr_handshake * handshake = tr_handshakeNew(
2178                    io,
2179                    mgr->session->
2180                    encryptionMode,
2181                    myHandshakeDoneCB,
2182                    mgr );
2183
2184                assert( tr_peerIoGetTorrentHash( io ) );
2185
2186                ++newConnectionsThisSecond;
2187
2188                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2189                                         handshakeCompare );
2190            }
2191
2192            atom->time = time( NULL );
2193        }
2194
2195        /* cleanup */
2196        tr_free( connections );
2197        tr_free( candidates );
2198    }
2199
2200    torrentUnlock( t );
2201    return TRUE;
2202}
2203
2204/****
2205*****
2206*****
2207*****
2208****/
2209
2210static double
2211allocateHowMuch( double         desiredAvgKB,
2212                 const double * history )
2213{
2214    const double baseline = desiredAvgKB * 1024.0 /
2215                            BANDWIDTH_PULSES_PER_SECOND;
2216    const double min = baseline * 0.66;
2217    const double max = baseline * 1.33;
2218    int          i;
2219    double       usedBytes;
2220    double       n;
2221    double       clamped;
2222
2223    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2224        usedBytes += history[i];
2225
2226    n =
2227        ( desiredAvgKB *
2228          1024.0 ) *
2229        ( BANDWIDTH_PULSE_HISTORY +
2230          1.0 ) / BANDWIDTH_PULSES_PER_SECOND - usedBytes;
2231
2232    /* clamp the return value to lessen oscillation */
2233    clamped = n;
2234    clamped = MAX( clamped, min );
2235    clamped = MIN( clamped, max );
2236/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2237  (%.2f)\n", desiredAvgKB,
2238  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2239  clamped/1024.0, n/1024.0 );*/
2240    return clamped;
2241}
2242
2243/**
2244 * Distributes a fixed amount of bandwidth among a set of peers.
2245 *
2246 * @param peerArray peers whose client-to-peer bandwidth will be set
2247 * @param direction whether to allocate upload or download bandwidth
2248 * @param history recent bandwidth history for these peers
2249 * @param desiredAvgKB overall bandwidth goal for this set of peers
2250 */
2251static void
2252setPeerBandwidth( tr_ptrArray *      peerArray,
2253                  const tr_direction direction,
2254                  const double *     history,
2255                  double             desiredAvgKB )
2256{
2257    const int    peerCount = tr_ptrArraySize( peerArray );
2258    const double bytes = allocateHowMuch( desiredAvgKB, history );
2259    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2260    const double meritBytes = MAX( 0, bytes - welfareBytes );
2261    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2262    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2263    int          i;
2264    int          candidateCount;
2265    double       welfare;
2266    size_t       bytesUsed;
2267
2268    assert( meritBytes >= 0.0 );
2269    assert( welfareBytes >= 0.0 );
2270    assert( direction == TR_UP || direction == TR_DOWN );
2271
2272    for( i = candidateCount = 0; i < peerCount; ++i )
2273        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2274            candidates[candidateCount++] = peers[i];
2275        else
2276            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2277
2278    for( i = bytesUsed = 0; i < candidateCount; ++i )
2279        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2280                                                direction );
2281
2282    welfare = welfareBytes / candidateCount;
2283
2284    for( i = 0; i < candidateCount; ++i )
2285    {
2286        tr_peer *    peer = candidates[i];
2287        const double merit = bytesUsed
2288                             ? ( meritBytes *
2289                                tr_peerIoGetBandwidthUsed( peer->io,
2290                                                           direction ) ) /
2291                             bytesUsed
2292                             : ( meritBytes / candidateCount );
2293        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2294    }
2295
2296    /* cleanup */
2297    tr_free( candidates );
2298}
2299
2300static size_t
2301countHandshakeBandwidth( tr_ptrArray * handshakes,
2302                         tr_direction  direction )
2303{
2304    const int n = tr_ptrArraySize( handshakes );
2305    int       i;
2306    size_t    total;
2307
2308    for( i = total = 0; i < n; ++i )
2309    {
2310        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2311        total += tr_peerIoGetBandwidthUsed( io, direction );
2312    }
2313    return total;
2314}
2315
2316static size_t
2317countPeerBandwidth( tr_ptrArray * peers,
2318                    tr_direction  direction )
2319{
2320    const int n = tr_ptrArraySize( peers );
2321    int       i;
2322    size_t    total;
2323
2324    for( i = total = 0; i < n; ++i )
2325    {
2326        tr_peer * peer = tr_ptrArrayNth( peers, i );
2327        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2328    }
2329    return total;
2330}
2331
2332static void
2333givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2334                             tr_direction  direction )
2335{
2336    const int n = tr_ptrArraySize( peers );
2337    int       i;
2338
2339    for( i = 0; i < n; ++i )
2340    {
2341        tr_peer * peer = tr_ptrArrayNth( peers, i );
2342        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2343    }
2344}
2345
2346static void
2347pumpAllPeers( tr_peerMgr * mgr )
2348{
2349    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2350    int       i, j;
2351
2352    for( i = 0; i < torrentCount; ++i )
2353    {
2354        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2355        const int peerCount = tr_ptrArraySize( t->peers );
2356        for( j = 0; j < peerCount; ++j )
2357        {
2358            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2359            tr_peerMsgsPulse( peer->msgs );
2360        }
2361    }
2362}
2363
2364/**
2365 * Allocate bandwidth for each peer connection.
2366 *
2367 * @param mgr the peer manager
2368 * @param direction whether to allocate upload or download bandwidth
2369 * @return the amount of directional bandwidth used since the last pulse.
2370 */
2371static double
2372allocateBandwidth( tr_peerMgr * mgr,
2373                   tr_direction direction )
2374{
2375    tr_session *  session = mgr->session;
2376    const int     pulseNumber = mgr->bandwidthPulseNumber;
2377    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2378    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2379    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2380    double        allBytesUsed = 0;
2381    size_t        poolBytesUsed = 0;
2382    int           i;
2383
2384    assert( mgr );
2385    assert( direction == TR_UP || direction == TR_DOWN );
2386
2387    /* before allocating bandwidth, pump the connected peers */
2388    pumpAllPeers( mgr );
2389
2390    for( i = 0; i < torrentCount; ++i )
2391    {
2392        Torrent *    t = torrents[i];
2393        const size_t used = countPeerBandwidth( t->peers, direction );
2394        +countHandshakeBandwidth( t->outgoingHandshakes,
2395                                  direction );
2396
2397        /* add this torrent's bandwidth use to allBytesUsed */
2398        allBytesUsed += used;
2399
2400        /* process the torrent's peers based on its speed mode */
2401        switch( tr_torrentGetSpeedMode( t->tor, direction ) )
2402        {
2403            case TR_SPEEDLIMIT_UNLIMITED:
2404                givePeersUnlimitedBandwidth( t->peers, direction );
2405                break;
2406
2407            case TR_SPEEDLIMIT_SINGLE:
2408                t->tor->rateHistory[direction][pulseNumber] = used;
2409
2410                setPeerBandwidth( t->peers, direction,
2411                                 t->tor->rateHistory[direction],
2412                                 tr_torrentGetSpeedLimit( t->tor,
2413                                                          direction ) );
2414                break;
2415
2416            case TR_SPEEDLIMIT_GLOBAL:
2417            {
2418                int       i;
2419                const int n = tr_ptrArraySize( t->peers );
2420                for( i = 0; i < n; ++i )
2421                    tr_ptrArrayAppend( globalPool,
2422                                      tr_ptrArrayNth( t->peers, i ) );
2423                poolBytesUsed += used;
2424                break;
2425            }
2426        }
2427    }
2428
2429    /* add incoming handshakes to the global pool */
2430    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2431    allBytesUsed += i;
2432    poolBytesUsed += i;
2433
2434    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2435
2436    /* handle the global pool's connections */
2437    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2438        givePeersUnlimitedBandwidth( globalPool, direction );
2439    else
2440        setPeerBandwidth( globalPool, direction,
2441                         mgr->globalPoolHistory[direction],
2442                         tr_sessionGetSpeedLimit( session, direction ) );
2443
2444    /* now that we've allocated bandwidth, pump all the connected peers */
2445    pumpAllPeers( mgr );
2446
2447    /* cleanup */
2448    tr_ptrArrayFree( globalPool, NULL );
2449    return allBytesUsed;
2450}
2451
2452static int
2453bandwidthPulse( void * vmgr )
2454{
2455    tr_peerMgr * mgr = vmgr;
2456    int          i;
2457
2458    managerLock( mgr );
2459
2460    /* keep track of how far we are into the cycle */
2461    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2462        mgr->bandwidthPulseNumber = 0;
2463
2464    /* allocate the upload and download bandwidth */
2465    for( i = 0; i < 2; ++i )
2466        mgr->rateHistory[i][mgr->bandwidthPulseNumber] =
2467            allocateBandwidth( mgr, i );
2468
2469    managerUnlock( mgr );
2470    return TRUE;
2471}
2472
Note: See TracBrowser for help on using the repository browser.