source: trunk/libtransmission/peer-mgr.c @ 6850

Last change on this file since 6850 was 6850, checked in by charles, 14 years ago

fix a null pointer dereference

  • Property svn:keywords set to Date Rev Author Id
File size: 69.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6850 2008-10-05 22:51:18Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62    * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.
76     * if they try to connect to us it's okay */
77    MYFLAG_UNREACHABLE = 2,
78
79    /* the minimum we'll wait before attempting to reconnect to a peer */
80    MINIMUM_RECONNECT_INTERVAL_SECS = 5
81};
82
83
84/**
85***
86**/
87
88/* We keep one of these for every peer we know about, whether
89 * it's connected or not, so the struct must be small.
90 * When our current connections underperform, we dip back
91 * into this list for new ones. */
92struct peer_atom
93{
94    uint8_t           from;
95    uint8_t           flags; /* these match the added_f flags */
96    uint8_t           myflags; /* flags that aren't defined in added_f */
97    uint16_t          port;
98    uint16_t          numFails;
99    struct in_addr    addr;
100    time_t            time; /* the last time the peer's connection status
101                              changed */
102    time_t            piece_data_time;
103};
104
105typedef struct
106{
107    uint8_t         hash[SHA_DIGEST_LENGTH];
108    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
109    tr_ptrArray *   pool; /* struct peer_atom */
110    tr_ptrArray *   peers; /* tr_peer */
111    tr_ptrArray *   webseeds; /* tr_webseed */
112    tr_timer *      reconnectTimer;
113    tr_timer *      rechokeTimer;
114    tr_timer *      refillTimer;
115    tr_torrent *    tor;
116    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
117    tr_bitfield *   requestedPieces;
118
119    unsigned int    isRunning : 1;
120
121    struct tr_peerMgr * manager;
122}
123Torrent;
124
125struct tr_peerMgr
126{
127    uint8_t        bandwidthPulseNumber;
128    tr_session *   session;
129    tr_ptrArray *  torrents; /* Torrent */
130    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
131    tr_timer *     bandwidthTimer;
132    double         rateHistory[2][BANDWIDTH_PULSE_HISTORY];
133    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
134};
135
136#define tordbg( t, fmt... ) \
137    tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ## fmt )
138
139#define dbgmsg( fmt... ) \
140    tr_deepLog( __FILE__, __LINE__, NULL, ## fmt )
141
142/**
143***
144**/
145
146static void
147managerLock( const struct tr_peerMgr * manager )
148{
149    tr_globalLock( manager->session );
150}
151
152static void
153managerUnlock( const struct tr_peerMgr * manager )
154{
155    tr_globalUnlock( manager->session );
156}
157
158static void
159torrentLock( Torrent * torrent )
160{
161    managerLock( torrent->manager );
162}
163
164static void
165torrentUnlock( Torrent * torrent )
166{
167    managerUnlock( torrent->manager );
168}
169
170static int
171torrentIsLocked( const Torrent * t )
172{
173    return tr_globalIsLocked( t->manager->session );
174}
175
176/**
177***
178**/
179
180static int
181compareAddresses( const struct in_addr * a,
182                  const struct in_addr * b )
183{
184    if( a->s_addr != b->s_addr )
185        return a->s_addr < b->s_addr ? -1 : 1;
186
187    return 0;
188}
189
190static int
191handshakeCompareToAddr( const void * va,
192                        const void * vb )
193{
194    const tr_handshake * a = va;
195
196    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
197}
198
199static int
200handshakeCompare( const void * a,
201                  const void * b )
202{
203    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
204}
205
206static tr_handshake*
207getExistingHandshake( tr_ptrArray *          handshakes,
208                      const struct in_addr * in_addr )
209{
210    return tr_ptrArrayFindSorted( handshakes,
211                                  in_addr,
212                                  handshakeCompareToAddr );
213}
214
215static int
216comparePeerAtomToAddress( const void * va,
217                          const void * vb )
218{
219    const struct peer_atom * a = va;
220
221    return compareAddresses( &a->addr, vb );
222}
223
224static int
225comparePeerAtoms( const void * va,
226                  const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static int
238torrentCompare( const void * va,
239                const void * vb )
240{
241    const Torrent * a = va;
242    const Torrent * b = vb;
243
244    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
245}
246
247static int
248torrentCompareToHash( const void * va,
249                      const void * vb )
250{
251    const Torrent * a = va;
252    const uint8_t * b_hash = vb;
253
254    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
255}
256
257static Torrent*
258getExistingTorrent( tr_peerMgr *    manager,
259                    const uint8_t * hash )
260{
261    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
262                                             hash,
263                                             torrentCompareToHash );
264}
265
266static int
267peerCompare( const void * va,
268             const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272
273    return compareAddresses( &a->in_addr, &b->in_addr );
274}
275
276static int
277peerCompareToAddr( const void * va,
278                   const void * vb )
279{
280    const tr_peer * a = va;
281
282    return compareAddresses( &a->in_addr, vb );
283}
284
285static tr_peer*
286getExistingPeer( Torrent *              torrent,
287                 const struct in_addr * in_addr )
288{
289    assert( torrentIsLocked( torrent ) );
290    assert( in_addr );
291
292    return tr_ptrArrayFindSorted( torrent->peers,
293                                  in_addr,
294                                  peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const                  Torrent * t,
299                 const struct in_addr * addr )
300{
301    assert( torrentIsLocked( t ) );
302    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
303}
304
305static int
306peerIsInUse( const Torrent *        ct,
307             const struct in_addr * addr )
308{
309    Torrent * t = (Torrent*) ct;
310
311    assert( torrentIsLocked ( t ) );
312
313    return getExistingPeer( t, addr )
314           || getExistingHandshake( t->outgoingHandshakes, addr )
315           || getExistingHandshake( t->manager->incomingHandshakes, addr );
316}
317
318static tr_peer*
319peerConstructor( const struct in_addr * in_addr )
320{
321    tr_peer * p;
322
323    p = tr_new0( tr_peer, 1 );
324    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent *              torrent,
330         const struct in_addr * in_addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, in_addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( in_addr );
341        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351    assert( peer->msgs );
352
353    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
354    tr_peerMsgsFree( peer->msgs );
355
356    tr_peerIoFree( peer->io );
357
358    tr_bitfieldFree( peer->have );
359    tr_bitfieldFree( peer->blame );
360    tr_free( peer->client );
361    tr_free( peer );
362}
363
364static void
365removePeer( Torrent * t,
366            tr_peer * peer )
367{
368    tr_peer *          removed;
369    struct peer_atom * atom;
370
371    assert( torrentIsLocked( t ) );
372
373    atom = getExistingAtom( t, &peer->in_addr );
374    assert( atom );
375    atom->time = time( NULL );
376
377    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
378    assert( removed == peer );
379    peerDestructor( removed );
380}
381
382static void
383removeAllPeers( Torrent * t )
384{
385    while( !tr_ptrArrayEmpty( t->peers ) )
386        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
387}
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( t->peers );
398    assert( torrentIsLocked( t ) );
399    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
400    assert( tr_ptrArrayEmpty( t->peers ) );
401
402    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
403
404    tr_timerFree( &t->reconnectTimer );
405    tr_timerFree( &t->rechokeTimer );
406    tr_timerFree( &t->refillTimer );
407
408    tr_bitfieldFree( t->requestedPieces );
409    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
410    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
411    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
412    tr_ptrArrayFree( t->peers, NULL );
413
414    tr_free( t );
415}
416
417static void peerCallbackFunc( void * vpeer,
418                              void * vevent,
419                              void * vt );
420
421static Torrent*
422torrentConstructor( tr_peerMgr * manager,
423                    tr_torrent * tor )
424{
425    int       i;
426    Torrent * t;
427
428    t = tr_new0( Torrent, 1 );
429    t->manager = manager;
430    t->tor = tor;
431    t->pool = tr_ptrArrayNew( );
432    t->peers = tr_ptrArrayNew( );
433    t->webseeds = tr_ptrArrayNew( );
434    t->outgoingHandshakes = tr_ptrArrayNew( );
435    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
436    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
437
438    for( i = 0; i < tor->info.webseedCount; ++i )
439    {
440        tr_webseed * w =
441            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
442                           t );
443        tr_ptrArrayAppend( t->webseeds, w );
444    }
445
446    return t;
447}
448
449/**
450 * For explanation, see http://www.bittorrent.org/fast_extensions.html
451 * Also see the "test-allowed-set" unit test
452 *
453 * @param k number of pieces in set
454 * @param sz number of pieces in the torrent
455 * @param infohash torrent's SHA1 hash
456 * @param ip peer's address
457 */
458struct tr_bitfield *
459tr_peerMgrGenerateAllowedSet(
460    const uint32_t         k,
461    const uint32_t         sz,
462    const                  uint8_t        *
463                           infohash,
464    const struct in_addr * ip )
465{
466    uint8_t       w[SHA_DIGEST_LENGTH + 4];
467    uint8_t       x[SHA_DIGEST_LENGTH];
468    tr_bitfield * a;
469    uint32_t      a_size;
470
471    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
472    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
473    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
474
475    a = tr_bitfieldNew( sz );
476    a_size = 0;
477
478    while( a_size < k )
479    {
480        int i;
481        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
482        {
483            uint32_t j = i * 4;                                /* (5) */
484            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
485            uint32_t index = y % sz;                           /* (7) */
486            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
487            {
488                tr_bitfieldAdd( a, index );                    /* (9) */
489                ++a_size;
490            }
491        }
492        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
493    }
494
495    return a;
496}
497
498static int bandwidthPulse( void * vmgr );
499
500tr_peerMgr*
501tr_peerMgrNew( tr_session * session )
502{
503    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
504
505    m->session = session;
506    m->torrents = tr_ptrArrayNew( );
507    m->incomingHandshakes = tr_ptrArrayNew( );
508    m->bandwidthPulseNumber = -1;
509    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
510                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
511    return m;
512}
513
514void
515tr_peerMgrFree( tr_peerMgr * manager )
516{
517    managerLock( manager );
518
519    tr_timerFree( &manager->bandwidthTimer );
520
521    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
522     * the item from manager->handshakes, so this is a little roundabout... */
523    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
524        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
525
526    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
527
528    /* free the torrents. */
529    tr_ptrArrayFree( manager->torrents, torrentDestructor );
530
531    managerUnlock( manager );
532    tr_free( manager );
533}
534
535static tr_peer**
536getConnectedPeers( Torrent * t,
537                   int *     setmeCount )
538{
539    int       i, peerCount, connectionCount;
540    tr_peer **peers;
541    tr_peer **ret;
542
543    assert( torrentIsLocked( t ) );
544
545    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
546    ret = tr_new( tr_peer *, peerCount );
547
548    for( i = connectionCount = 0; i < peerCount; ++i )
549        if( peers[i]->msgs )
550            ret[connectionCount++] = peers[i];
551
552    *setmeCount = connectionCount;
553    return ret;
554}
555
556static int
557clientIsDownloadingFrom( const tr_peer * peer )
558{
559    return peer->clientIsInterested && !peer->clientIsChoked;
560}
561
562static int
563clientIsUploadingTo( const tr_peer * peer )
564{
565    return peer->peerIsInterested && !peer->peerIsChoked;
566}
567
568/***
569****
570***/
571
572int
573tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
574                      const uint8_t *        torrentHash,
575                      const struct in_addr * addr )
576{
577    int                      isSeed = FALSE;
578    const Torrent *          t = NULL;
579    const struct peer_atom * atom = NULL;
580
581    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
582    if( t )
583        atom = getExistingAtom( t, addr );
584    if( atom )
585        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
586
587    return isSeed;
588}
589
590/***
591****  Refill
592***/
593
594struct tr_refill_piece
595{
596    tr_priority_t    priority;
597    int              random;
598    uint32_t         piece;
599    uint32_t         peerCount;
600};
601
602static int
603compareRefillPiece( const void * aIn,
604                    const void * bIn )
605{
606    const struct tr_refill_piece * a = aIn;
607    const struct tr_refill_piece * b = bIn;
608
609    /* if one piece has a higher priority, it goes first */
610    if( a->priority != b->priority )
611        return a->priority > b->priority ? -1 : 1;
612
613    /* otherwise if one has fewer peers, it goes first */
614    if( a->peerCount != b->peerCount )
615        return a->peerCount < b->peerCount ? -1 : 1;
616
617    /* otherwise go with our random seed */
618    if( a->random != b->random )
619        return a->random < b->random ? -1 : 1;
620
621    return 0;
622}
623
624static int
625isPieceInteresting( const tr_torrent * tor,
626                    tr_piece_index_t   piece )
627{
628    if( tor->info.pieces[piece].dnd ) /* we don't want it */
629        return 0;
630
631    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
632        return 0;
633
634    return 1;
635}
636
637static uint32_t*
638getPreferredPieces( Torrent *  t,
639                    uint32_t * pieceCount )
640{
641    const tr_torrent * tor = t->tor;
642    const tr_info *    inf = &tor->info;
643    tr_piece_index_t   i;
644    uint32_t           poolSize = 0;
645    uint32_t *         pool = tr_new( uint32_t, inf->pieceCount );
646    int                peerCount;
647    tr_peer**          peers;
648
649    assert( torrentIsLocked( t ) );
650
651    peers = getConnectedPeers( t, &peerCount );
652
653    for( i = 0; i < inf->pieceCount; ++i )
654        if( isPieceInteresting( tor, i ) )
655            pool[poolSize++] = i;
656
657    /* sort the pool from most interesting to least... */
658    if( poolSize > 1 )
659    {
660        uint32_t                 j;
661        struct tr_refill_piece * p = tr_new( struct tr_refill_piece,
662                                             poolSize );
663
664        for( j = 0; j < poolSize; ++j )
665        {
666            int                      k;
667            const tr_piece_index_t   piece = pool[j];
668            struct tr_refill_piece * setme = p + j;
669
670            setme->piece = piece;
671            setme->priority = inf->pieces[piece].priority;
672            setme->peerCount = 0;
673            setme->random = tr_cryptoWeakRandInt( INT_MAX );
674
675            for( k = 0; k < peerCount; ++k )
676            {
677                const tr_peer * peer = peers[k];
678                if( peer->peerIsInterested && !peer->clientIsChoked
679                  && tr_bitfieldHas( peer->have, piece ) )
680                    ++setme->peerCount;
681            }
682        }
683
684        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
685               compareRefillPiece );
686
687        for( j = 0; j < poolSize; ++j )
688            pool[j] = p[j].piece;
689
690        tr_free( p );
691    }
692
693    tr_free( peers );
694
695    *pieceCount = poolSize;
696    return pool;
697}
698
699static tr_peer**
700getPeersUploadingToClient( Torrent * t,
701                           int *     setmeCount )
702{
703    int        i;
704    int        peerCount = 0;
705    int        retCount = 0;
706    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
707    tr_peer ** ret = tr_new( tr_peer *, peerCount );
708
709    /* get a list of peers we're downloading from */
710    for( i = 0; i < peerCount; ++i )
711        if( clientIsDownloadingFrom( peers[i] ) )
712            ret[retCount++] = peers[i];
713
714    /* pick a different starting point each time so all peers
715     * get a chance at the first blocks in the queue */
716    if( retCount )
717    {
718        tr_peer ** tmp = tr_new( tr_peer *, retCount );
719        i = tr_cryptoWeakRandInt( retCount );
720        memcpy( tmp, ret, sizeof( tr_peer* ) * retCount );
721        memcpy( ret, tmp + i, sizeof( tr_peer* ) * ( retCount - i ) );
722        memcpy( ret + ( retCount - i ), tmp, sizeof( tr_peer* ) * i );
723        tr_free( tmp );
724    }
725
726    *setmeCount = retCount;
727    return ret;
728}
729
730static int
731refillPulse( void * vtorrent )
732{
733    Torrent *        t = vtorrent;
734    tr_torrent *     tor = t->tor;
735    int              peerCount;
736    int              webseedCount;
737    tr_peer **       peers;
738    tr_webseed **    webseeds;
739    uint32_t         pieceCount;
740    uint32_t *       pieces;
741    tr_piece_index_t i;
742
743    if( !t->isRunning )
744        return TRUE;
745    if( tr_torrentIsSeed( t->tor ) )
746        return TRUE;
747
748    torrentLock( t );
749    tordbg( t, "Refilling Request Buffers..." );
750
751    pieces = getPreferredPieces( t, &pieceCount );
752    peers = getPeersUploadingToClient( t, &peerCount );
753    webseedCount = tr_ptrArraySize( t->webseeds );
754    webseeds = tr_memdup( tr_ptrArrayBase(
755                             t->webseeds ), webseedCount *
756                         sizeof( tr_webseed* ) );
757
758    for( i = 0; ( webseedCount || peerCount ) && i < pieceCount; ++i )
759    {
760        int                    j;
761        int                    handled = FALSE;
762        const tr_piece_index_t piece = pieces[i];
763
764        assert( piece < tor->info.pieceCount );
765
766        /* find a peer who can ask for this piece */
767        for( j = 0; !handled && j < peerCount; )
768        {
769            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs,
770                                                           piece );
771            switch( val )
772            {
773                case TR_ADDREQ_FULL:
774                case TR_ADDREQ_CLIENT_CHOKED:
775                    peers[j] = peers[--peerCount];
776                    break;
777
778                case TR_ADDREQ_MISSING:
779                case TR_ADDREQ_DUPLICATE:
780                    ++j;
781                    break;
782
783                case TR_ADDREQ_OK:
784                    tr_bitfieldAdd( t->requestedPieces, piece );
785                    handled = TRUE;
786                    break;
787
788                default:
789                    assert( 0 && "unhandled value" );
790                    break;
791            }
792        }
793
794        /* maybe one of the webseeds can do it */
795        for( j = 0; !handled && j < webseedCount; )
796        {
797            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
798                                                          piece );
799            switch( val )
800            {
801                case TR_ADDREQ_FULL:
802                    webseeds[j] = webseeds[--webseedCount];
803                    break;
804
805                case TR_ADDREQ_OK:
806                    tr_bitfieldAdd( t->requestedPieces, piece );
807                    handled = TRUE;
808                    break;
809
810                default:
811                    assert( 0 && "unhandled value" );
812                    break;
813            }
814        }
815    }
816
817    /* cleanup */
818    tr_free( webseeds );
819    tr_free( peers );
820    tr_free( pieces );
821
822    t->refillTimer = NULL;
823    torrentUnlock( t );
824    return FALSE;
825}
826
827static void
828addStrike( Torrent * t,
829           tr_peer * peer )
830{
831    tordbg( t, "increasing peer %s strike count to %d",
832            tr_peerIoAddrStr( &peer->in_addr,
833                              peer->port ), peer->strikes + 1 );
834
835    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
836    {
837        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
838        atom->myflags |= MYFLAG_BANNED;
839        peer->doPurge = 1;
840        tordbg( t, "banning peer %s",
841               tr_peerIoAddrStr( &atom->addr, atom->port ) );
842    }
843}
844
845static void
846gotBadPiece( Torrent *        t,
847             tr_piece_index_t pieceIndex )
848{
849    tr_torrent *   tor = t->tor;
850    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
851
852    tor->corruptCur += byteCount;
853    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
854}
855
856static void
857refillSoon( Torrent * t )
858{
859    if( t->refillTimer == NULL )
860        t->refillTimer = tr_timerNew( t->manager->session,
861                                      refillPulse, t,
862                                      REFILL_PERIOD_MSEC );
863}
864
865static void
866peerCallbackFunc( void * vpeer,
867                  void * vevent,
868                  void * vt )
869{
870    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
871    Torrent *             t = (Torrent *) vt;
872    const tr_peer_event * e = vevent;
873
874    torrentLock( t );
875
876    switch( e->eventType )
877    {
878        case TR_PEER_NEED_REQ:
879            refillSoon( t );
880            break;
881
882        case TR_PEER_CANCEL:
883            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
884            break;
885
886        case TR_PEER_PEER_GOT_DATA:
887        {
888            const time_t now = time( NULL );
889            tr_torrent * tor = t->tor;
890            tor->activityDate = now;
891            tor->uploadedCur += e->length;
892            tr_statsAddUploaded( tor->session, e->length );
893            if( peer )
894            {
895                struct peer_atom * atom = getExistingAtom( t,
896                                                           &peer->in_addr );
897                atom->piece_data_time = time( NULL );
898            }
899            break;
900        }
901
902        case TR_PEER_CLIENT_GOT_DATA:
903        {
904            const time_t now = time( NULL );
905            tr_torrent * tor = t->tor;
906            tor->activityDate = now;
907            /* only add this to downloadedCur if we got it from a peer --
908             * webseeds shouldn't count against our ratio.  As one tracker
909             * admin put it, "Those pieces are downloaded directly from the
910             * content distributor, not the peers, it is the tracker's job
911             * to manage the swarms, not the web server and does not fit
912             * into the jurisdiction of the tracker." */
913            if( peer )
914                tor->downloadedCur += e->length;
915            tr_statsAddDownloaded( tor->session, e->length );
916            if( peer )
917            {
918                struct peer_atom * atom = getExistingAtom( t,
919                                                           &peer->in_addr );
920                atom->piece_data_time = time( NULL );
921            }
922            break;
923        }
924
925        case TR_PEER_PEER_PROGRESS:
926        {
927            if( peer )
928            {
929                struct peer_atom * atom = getExistingAtom( t,
930                                                           &peer->in_addr );
931                const int          peerIsSeed = e->progress >= 1.0;
932                if( peerIsSeed )
933                {
934                    tordbg( t, "marking peer %s as a seed",
935                           tr_peerIoAddrStr( &atom->addr,
936                                             atom->port ) );
937                    atom->flags |= ADDED_F_SEED_FLAG;
938                }
939                else
940                {
941                    tordbg( t, "marking peer %s as a non-seed",
942                           tr_peerIoAddrStr( &atom->addr,
943                                             atom->port ) );
944                    atom->flags &= ~ADDED_F_SEED_FLAG;
945                }
946            }
947            break;
948        }
949
950        case TR_PEER_CLIENT_GOT_BLOCK:
951        {
952            tr_torrent *     tor = t->tor;
953
954            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
955                                                e->offset );
956
957            tr_cpBlockAdd( tor->completion, block );
958
959            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
960            {
961                const tr_piece_index_t p = e->pieceIndex;
962                const int              ok = tr_ioTestPiece( tor, p );
963
964                if( !ok )
965                {
966                    tr_torerr( tor,
967                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
968                              (unsigned long)p );
969                }
970
971                tr_torrentSetHasPiece( tor, p, ok );
972                tr_torrentSetPieceChecked( tor, p, TRUE );
973                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
974
975                if( !ok )
976                    gotBadPiece( t, p );
977                else
978                {
979                    int        i, peerCount;
980                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
981                    for( i = 0; i < peerCount; ++i )
982                        tr_peerMsgsHave( peers[i]->msgs, p );
983                    tr_free( peers );
984                }
985
986                tr_torrentRecheckCompleteness( tor );
987            }
988            break;
989        }
990
991        case TR_PEER_ERROR:
992            if( e->err == EINVAL )
993            {
994                addStrike( t, peer );
995                peer->doPurge = 1;
996            }
997            else if( ( e->err == EPROTO )
998                  || ( e->err == ERANGE )
999                  || ( e->err == EMSGSIZE )
1000                  || ( e->err == ENOTCONN ) )
1001            {
1002                /* some protocol error from the peer */
1003                peer->doPurge = 1;
1004            }
1005            else /* a local error, such as an IO error */
1006            {
1007                t->tor->error = e->err;
1008                tr_strlcpy( t->tor->errorString,
1009                            tr_strerror( t->tor->error ),
1010                            sizeof( t->tor->errorString ) );
1011                tr_torrentStop( t->tor );
1012            }
1013            break;
1014
1015        default:
1016            assert( 0 );
1017    }
1018
1019    torrentUnlock( t );
1020}
1021
1022static void
1023ensureAtomExists( Torrent *              t,
1024                  const struct in_addr * addr,
1025                  uint16_t               port,
1026                  uint8_t                flags,
1027                  uint8_t                from )
1028{
1029    if( getExistingAtom( t, addr ) == NULL )
1030    {
1031        struct peer_atom * a;
1032        a = tr_new0( struct peer_atom, 1 );
1033        a->addr = *addr;
1034        a->port = port;
1035        a->flags = flags;
1036        a->from = from;
1037        tordbg( t, "got a new atom: %s",
1038               tr_peerIoAddrStr( &a->addr, a->port ) );
1039        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1040    }
1041}
1042
1043static int
1044getMaxPeerCount( const tr_torrent * tor )
1045{
1046    return tor->maxConnectedPeers;
1047}
1048
1049static int
1050getPeerCount( const Torrent * t )
1051{
1052    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1053               t->outgoingHandshakes );
1054}
1055
1056/* FIXME: this is kind of a mess. */
1057static int
1058myHandshakeDoneCB( tr_handshake *  handshake,
1059                   tr_peerIo *     io,
1060                   int             isConnected,
1061                   const uint8_t * peer_id,
1062                   void *          vmanager )
1063{
1064    int                    ok = isConnected;
1065    int                    success = FALSE;
1066    uint16_t               port;
1067    const struct in_addr * addr;
1068    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1069    Torrent *              t;
1070    tr_handshake *         ours;
1071
1072    assert( io );
1073    assert( isConnected == 0 || isConnected == 1 );
1074
1075    t = tr_peerIoHasTorrentHash( io )
1076        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1077        : NULL;
1078
1079    if( tr_peerIoIsIncoming ( io ) )
1080        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1081                                        handshake, handshakeCompare );
1082    else if( t )
1083        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1084                                        handshake, handshakeCompare );
1085    else
1086        ours = handshake;
1087
1088    assert( ours );
1089    assert( ours == handshake );
1090
1091    if( t )
1092        torrentLock( t );
1093
1094    addr = tr_peerIoGetAddress( io, &port );
1095
1096    if( !ok || !t || !t->isRunning )
1097    {
1098        if( t )
1099        {
1100            struct peer_atom * atom = getExistingAtom( t, addr );
1101            if( atom )
1102                ++atom->numFails;
1103        }
1104
1105        tr_peerIoFree( io );
1106    }
1107    else /* looking good */
1108    {
1109        struct peer_atom * atom;
1110        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1111        atom = getExistingAtom( t, addr );
1112        atom->time = time( NULL );
1113
1114        if( atom->myflags & MYFLAG_BANNED )
1115        {
1116            tordbg( t, "banned peer %s tried to reconnect",
1117                   tr_peerIoAddrStr( &atom->addr,
1118                                     atom->port ) );
1119            tr_peerIoFree( io );
1120        }
1121        else if( tr_peerIoIsIncoming( io )
1122               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1123
1124        {
1125            tr_peerIoFree( io );
1126        }
1127        else
1128        {
1129            tr_peer * peer = getExistingPeer( t, addr );
1130
1131            if( peer ) /* we already have this peer */
1132            {
1133                tr_peerIoFree( io );
1134            }
1135            else
1136            {
1137                peer = getPeer( t, addr );
1138                tr_free( peer->client );
1139
1140                if( !peer_id )
1141                    peer->client = NULL;
1142                else
1143                {
1144                    char client[128];
1145                    tr_clientForId( client, sizeof( client ), peer_id );
1146                    peer->client = tr_strdup( client );
1147                }
1148                peer->port = port;
1149                peer->io = io;
1150                peer->msgs =
1151                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1152                                    &peer->msgsTag );
1153
1154                success = TRUE;
1155            }
1156        }
1157    }
1158
1159    if( t )
1160        torrentUnlock( t );
1161
1162    return success;
1163}
1164
1165void
1166tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1167                       struct in_addr * addr,
1168                       uint16_t         port,
1169                       int              socket )
1170{
1171    managerLock( manager );
1172
1173    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1174    {
1175        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1176               inet_ntoa( *addr ) );
1177        tr_netClose( socket );
1178    }
1179    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1180    {
1181        tr_netClose( socket );
1182    }
1183    else /* we don't have a connetion to them yet... */
1184    {
1185        tr_peerIo *    io;
1186        tr_handshake * handshake;
1187
1188        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1189
1190        handshake = tr_handshakeNew( io,
1191                                     manager->session->encryptionMode,
1192                                     myHandshakeDoneCB,
1193                                     manager );
1194
1195        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1196                                 handshakeCompare );
1197    }
1198
1199    managerUnlock( manager );
1200}
1201
1202void
1203tr_peerMgrAddPex( tr_peerMgr *    manager,
1204                  const uint8_t * torrentHash,
1205                  uint8_t         from,
1206                  const tr_pex *  pex )
1207{
1208    Torrent * t;
1209
1210    managerLock( manager );
1211
1212    t = getExistingTorrent( manager, torrentHash );
1213    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1214        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1215
1216    managerUnlock( manager );
1217}
1218
1219tr_pex *
1220tr_peerMgrCompactToPex( const void *    compact,
1221                        size_t          compactLen,
1222                        const uint8_t * added_f,
1223                        size_t          added_f_len,
1224                        size_t *        pexCount )
1225{
1226    size_t          i;
1227    size_t          n = compactLen / 6;
1228    const uint8_t * walk = compact;
1229    tr_pex *        pex = tr_new0( tr_pex, n );
1230
1231    for( i = 0; i < n; ++i )
1232    {
1233        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1234        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1235        if( added_f && ( n == added_f_len ) )
1236            pex[i].flags = added_f[i];
1237    }
1238
1239    *pexCount = n;
1240    return pex;
1241}
1242
1243/**
1244***
1245**/
1246
1247void
1248tr_peerMgrSetBlame( tr_peerMgr *     manager,
1249                    const uint8_t *  torrentHash,
1250                    tr_piece_index_t pieceIndex,
1251                    int              success )
1252{
1253    if( !success )
1254    {
1255        int        peerCount, i;
1256        Torrent *  t = getExistingTorrent( manager, torrentHash );
1257        tr_peer ** peers;
1258
1259        assert( torrentIsLocked( t ) );
1260
1261        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1262        for( i = 0; i < peerCount; ++i )
1263        {
1264            tr_peer * peer = peers[i];
1265            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1266            {
1267                tordbg(
1268                    t,
1269                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1270                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1271                    pieceIndex, (int)peer->strikes + 1 );
1272                addStrike( t, peer );
1273            }
1274        }
1275    }
1276}
1277
1278int
1279tr_pexCompare( const void * va,
1280               const void * vb )
1281{
1282    const tr_pex * a = va;
1283    const tr_pex * b = vb;
1284    int            i =
1285        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1286
1287    if( i ) return i;
1288    if( a->port < b->port ) return -1;
1289    if( a->port > b->port ) return 1;
1290    return 0;
1291}
1292
1293int tr_pexCompare( const void * a,
1294                   const void * b );
1295
1296static int
1297peerPrefersCrypto( const tr_peer * peer )
1298{
1299    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1300        return TRUE;
1301
1302    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1303        return FALSE;
1304
1305    return tr_peerIoIsEncrypted( peer->io );
1306}
1307
1308int
1309tr_peerMgrGetPeers( tr_peerMgr *    manager,
1310                    const uint8_t * torrentHash,
1311                    tr_pex **       setme_pex )
1312{
1313    int peerCount = 0;
1314    const Torrent *  t;
1315
1316    managerLock( manager );
1317
1318    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1319    if( !t )
1320    {
1321        *setme_pex = NULL;
1322    }
1323    else
1324    {
1325        int i;
1326        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1327        tr_pex * pex = tr_new( tr_pex, peerCount );
1328        tr_pex * walk = pex;
1329
1330        for( i = 0; i < peerCount; ++i, ++walk )
1331        {
1332            const tr_peer * peer = peers[i];
1333            walk->in_addr = peer->in_addr;
1334            walk->port = peer->port;
1335            walk->flags = 0;
1336            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1337            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1338        }
1339
1340        assert( ( walk - pex ) == peerCount );
1341        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1342        *setme_pex = pex;
1343    }
1344
1345    managerUnlock( manager );
1346    return peerCount;
1347}
1348
1349static int reconnectPulse( void * vtorrent );
1350
1351static int rechokePulse( void * vtorrent );
1352
1353void
1354tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1355                        const uint8_t * torrentHash )
1356{
1357    Torrent * t;
1358
1359    managerLock( manager );
1360
1361    t = getExistingTorrent( manager, torrentHash );
1362
1363    assert( t );
1364    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1365    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1366
1367    if( !t->isRunning )
1368    {
1369        t->isRunning = 1;
1370
1371        t->reconnectTimer = tr_timerNew( t->manager->session,
1372                                         reconnectPulse, t,
1373                                         RECONNECT_PERIOD_MSEC );
1374
1375        t->rechokeTimer = tr_timerNew( t->manager->session,
1376                                       rechokePulse, t,
1377                                       RECHOKE_PERIOD_MSEC );
1378
1379        reconnectPulse( t );
1380
1381        rechokePulse( t );
1382
1383        if( !tr_ptrArrayEmpty( t->webseeds ) )
1384            refillSoon( t );
1385    }
1386
1387    managerUnlock( manager );
1388}
1389
1390static void
1391stopTorrent( Torrent * t )
1392{
1393    assert( torrentIsLocked( t ) );
1394
1395    t->isRunning = 0;
1396    tr_timerFree( &t->rechokeTimer );
1397    tr_timerFree( &t->reconnectTimer );
1398
1399    /* disconnect the peers. */
1400    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1401    tr_ptrArrayClear( t->peers );
1402
1403    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1404     * which removes the handshake from t->outgoingHandshakes... */
1405    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1406        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1407}
1408
1409void
1410tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1411                       const uint8_t * torrentHash )
1412{
1413    managerLock( manager );
1414
1415    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1416
1417    managerUnlock( manager );
1418}
1419
1420void
1421tr_peerMgrAddTorrent( tr_peerMgr * manager,
1422                      tr_torrent * tor )
1423{
1424    Torrent * t;
1425
1426    managerLock( manager );
1427
1428    assert( tor );
1429    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1430
1431    t = torrentConstructor( manager, tor );
1432    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1433
1434    managerUnlock( manager );
1435}
1436
1437void
1438tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1439                         const uint8_t * torrentHash )
1440{
1441    Torrent * t;
1442
1443    managerLock( manager );
1444
1445    t = getExistingTorrent( manager, torrentHash );
1446    assert( t );
1447    stopTorrent( t );
1448    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1449    torrentDestructor( t );
1450
1451    managerUnlock( manager );
1452}
1453
1454void
1455tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1456                               const uint8_t *    torrentHash,
1457                               int8_t *           tab,
1458                               unsigned int       tabCount )
1459{
1460    tr_piece_index_t   i;
1461    const Torrent *    t;
1462    const tr_torrent * tor;
1463    float              interval;
1464    int                isComplete;
1465    int                peerCount;
1466    const tr_peer **   peers;
1467
1468    managerLock( manager );
1469
1470    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1471    tor = t->tor;
1472    interval = tor->info.pieceCount / (float)tabCount;
1473    isComplete = tor
1474                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1475    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1476
1477    memset( tab, 0, tabCount );
1478
1479    for( i = 0; tor && i < tabCount; ++i )
1480    {
1481        const int piece = i * interval;
1482
1483        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1484            tab[i] = -1;
1485        else if( peerCount )
1486        {
1487            int j;
1488            for( j = 0; j < peerCount; ++j )
1489                if( tr_bitfieldHas( peers[j]->have, i ) )
1490                    ++tab[i];
1491        }
1492    }
1493
1494    managerUnlock( manager );
1495}
1496
1497/* Returns the pieces that are available from peers */
1498tr_bitfield*
1499tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1500                        const uint8_t *    torrentHash )
1501{
1502    int           i, size;
1503    Torrent *     t;
1504    tr_peer **    peers;
1505    tr_bitfield * pieces;
1506
1507    managerLock( manager );
1508
1509    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1510    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1511    peers = getConnectedPeers( t, &size );
1512    for( i = 0; i < size; ++i )
1513        tr_bitfieldOr( pieces, peers[i]->have );
1514
1515    managerUnlock( manager );
1516    tr_free( peers );
1517    return pieces;
1518}
1519
1520int
1521tr_peerMgrHasConnections( const tr_peerMgr * manager,
1522                          const uint8_t *    torrentHash )
1523{
1524    int             ret;
1525    const Torrent * t;
1526
1527    managerLock( manager );
1528
1529    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1530    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1531               || !tr_ptrArrayEmpty( t->webseeds ) );
1532
1533    managerUnlock( manager );
1534    return ret;
1535}
1536
1537void
1538tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1539                        const uint8_t *    torrentHash,
1540                        int *              setmePeersKnown,
1541                        int *              setmePeersConnected,
1542                        int *              setmeSeedsConnected,
1543                        int *              setmeWebseedsSendingToUs,
1544                        int *              setmePeersSendingToUs,
1545                        int *              setmePeersGettingFromUs,
1546                        int *              setmePeersFrom,
1547                        double *           setmeRateToClient,
1548                        double *           setmeRateToPeers )
1549{
1550    int                 i, size;
1551    const Torrent *     t;
1552    const tr_peer **    peers;
1553    const tr_webseed ** webseeds;
1554
1555    managerLock( manager );
1556
1557    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1558    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1559
1560    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1561    *setmePeersConnected       = 0;
1562    *setmeSeedsConnected       = 0;
1563    *setmePeersGettingFromUs   = 0;
1564    *setmePeersSendingToUs     = 0;
1565    *setmeWebseedsSendingToUs  = 0;
1566    *setmeRateToClient         = 0;
1567    *setmeRateToPeers          = 0;
1568
1569    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1570        setmePeersFrom[i] = 0;
1571
1572    for( i = 0; i < size; ++i )
1573    {
1574        const tr_peer *          peer = peers[i];
1575        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1576
1577        if( peer->io == NULL ) /* not connected */
1578            continue;
1579
1580        ++ * setmePeersConnected;
1581
1582        ++setmePeersFrom[atom->from];
1583
1584        if( clientIsDownloadingFrom( peer ) )
1585            ++ * setmePeersSendingToUs;
1586
1587        if( clientIsUploadingTo( peer ) )
1588            ++ * setmePeersGettingFromUs;
1589
1590        if( atom->flags & ADDED_F_SEED_FLAG )
1591            ++ * setmeSeedsConnected;
1592
1593        *setmeRateToClient += tr_peerIoGetRateToClient( peer->io );
1594
1595        *setmeRateToPeers += tr_peerIoGetRateToPeer( peer->io );
1596    }
1597
1598    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1599    for( i = 0; i < size; ++i )
1600    {
1601        if( tr_webseedIsActive( webseeds[i] ) )
1602            ++ * setmeWebseedsSendingToUs;
1603    }
1604
1605    managerUnlock( manager );
1606}
1607
1608double
1609tr_peerMgrGetRate( const tr_peerMgr * manager,
1610                   tr_direction       direction )
1611{
1612    int    i;
1613    double bytes = 0;
1614
1615    assert( manager != NULL );
1616    assert( direction == TR_UP || direction == TR_DOWN );
1617
1618    for( i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
1619        bytes += manager->rateHistory[direction][i];
1620
1621    return ( BANDWIDTH_PULSES_PER_SECOND * bytes )
1622           / ( BANDWIDTH_PULSE_HISTORY * 1024 );
1623}
1624
1625float*
1626tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1627                     const uint8_t *    torrentHash )
1628{
1629    const Torrent *     t;
1630    const tr_webseed ** webseeds;
1631    int                 i;
1632    int                 webseedCount;
1633    float *             ret;
1634
1635    assert( manager );
1636    managerLock( manager );
1637
1638    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1639    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1640                                                     &webseedCount );
1641    assert( webseedCount == t->tor->info.webseedCount );
1642    ret = tr_new0( float, webseedCount );
1643
1644    for( i = 0; i < webseedCount; ++i )
1645        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1646            ret[i] = -1.0;
1647
1648    managerUnlock( manager );
1649    return ret;
1650}
1651
1652struct tr_peer_stat *
1653tr_peerMgrPeerStats(
1654    const                          tr_peerMgr  * manager,
1655    const                          uint8_t     *
1656                                   torrentHash,
1657    int               * setmeCount UNUSED )
1658{
1659    int             i, size;
1660    const Torrent * t;
1661    tr_peer **      peers;
1662    tr_peer_stat *  ret;
1663
1664    assert( manager );
1665    managerLock( manager );
1666
1667    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1668    peers = getConnectedPeers( (Torrent*)t, &size );
1669    ret = tr_new0( tr_peer_stat, size );
1670
1671    for( i = 0; i < size; ++i )
1672    {
1673        char *                   pch;
1674        const tr_peer *          peer = peers[i];
1675        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1676        tr_peer_stat *           stat = ret + i;
1677
1678        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1679        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1680                   sizeof( stat->client ) );
1681        stat->port               = peer->port;
1682        stat->from               = atom->from;
1683        stat->progress           = peer->progress;
1684        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1685        stat->rateToPeer         = tr_peerIoGetRateToPeer( peer->io );
1686        stat->rateToClient       = tr_peerIoGetRateToClient( peer->io );
1687        stat->peerIsChoked       = peer->peerIsChoked;
1688        stat->peerIsInterested   = peer->peerIsInterested;
1689        stat->clientIsChoked     = peer->clientIsChoked;
1690        stat->clientIsInterested = peer->clientIsInterested;
1691        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1692        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1693        stat->isUploadingTo      = clientIsUploadingTo( peer );
1694
1695        pch = stat->flagStr;
1696        if( t->optimistic == peer ) *pch++ = 'O';
1697        if( stat->isDownloadingFrom ) *pch++ = 'D';
1698        else if( stat->clientIsInterested ) *pch++ = 'd';
1699        if( stat->isUploadingTo ) *pch++ = 'U';
1700        else if( stat->peerIsInterested ) *pch++ = 'u';
1701        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1702                'K';
1703        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1704        if( stat->isEncrypted ) *pch++ = 'E';
1705        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1706        if( stat->isIncoming ) *pch++ = 'I';
1707        *pch = '\0';
1708    }
1709
1710    *setmeCount = size;
1711    tr_free( peers );
1712
1713    managerUnlock( manager );
1714    return ret;
1715}
1716
1717/**
1718***
1719**/
1720
1721struct ChokeData
1722{
1723    unsigned int    doUnchoke    : 1;
1724    unsigned int    isInterested : 1;
1725    double          rateToClient;
1726    double          rateToPeer;
1727    tr_peer *       peer;
1728};
1729
1730static int
1731tr_compareDouble( double a,
1732                  double b )
1733{
1734    if( a < b ) return -1;
1735    if( a > b ) return 1;
1736    return 0;
1737}
1738
1739static int
1740compareChoke( const void * va,
1741              const void * vb )
1742{
1743    const struct ChokeData * a = va;
1744    const struct ChokeData * b = vb;
1745    int                      diff = 0;
1746
1747    if( diff == 0 ) /* prefer higher dl speeds */
1748        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1749    if( diff == 0 ) /* prefer higher ul speeds */
1750        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1751    if( diff == 0 ) /* prefer unchoked */
1752        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1753
1754    return diff;
1755}
1756
1757static int
1758isNew( const tr_peer * peer )
1759{
1760    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1761}
1762
1763static int
1764isSame( const tr_peer * peer )
1765{
1766    return peer && peer->client && strstr( peer->client, "Transmission" );
1767}
1768
1769/**
1770***
1771**/
1772
1773static void
1774rechoke( Torrent * t )
1775{
1776    int                i, peerCount, size, unchokedInterested;
1777    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1778    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1779
1780    assert( torrentIsLocked( t ) );
1781
1782    /* sort the peers by preference and rate */
1783    for( i = 0, size = 0; i < peerCount; ++i )
1784    {
1785        tr_peer * peer = peers[i];
1786        if( peer->progress >= 1.0 ) /* choke all seeds */
1787            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1788        else
1789        {
1790            struct ChokeData * node = &choke[size++];
1791            node->peer = peer;
1792            node->isInterested = peer->peerIsInterested;
1793            node->rateToClient = tr_peerIoGetRateToClient( peer->io );
1794            node->rateToPeer = tr_peerIoGetRateToPeer( peer->io );
1795        }
1796    }
1797
1798    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1799
1800    /**
1801     * Reciprocation and number of uploads capping is managed by unchoking
1802     * the N peers which have the best upload rate and are interested.
1803     * This maximizes the client's download rate. These N peers are
1804     * referred to as downloaders, because they are interested in downloading
1805     * from the client.
1806     *
1807     * Peers which have a better upload rate (as compared to the downloaders)
1808     * but aren't interested get unchoked. If they become interested, the
1809     * downloader with the worst upload rate gets choked. If a client has
1810     * a complete file, it uses its upload rate rather than its download
1811     * rate to decide which peers to unchoke.
1812     */
1813    unchokedInterested = 0;
1814    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1815    {
1816        choke[i].doUnchoke = 1;
1817        if( choke[i].isInterested )
1818            ++unchokedInterested;
1819    }
1820
1821    /* optimistic unchoke */
1822    if( i < size )
1823    {
1824        int                n;
1825        struct ChokeData * c;
1826        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1827
1828        for( ; i < size; ++i )
1829        {
1830            if( choke[i].isInterested )
1831            {
1832                const tr_peer * peer = choke[i].peer;
1833                int             x = 1, y;
1834                if( isNew( peer ) ) x *= 3;
1835                if( isSame( peer ) ) x *= 3;
1836                for( y = 0; y < x; ++y )
1837                    tr_ptrArrayAppend( randPool, &choke[i] );
1838            }
1839        }
1840
1841        if( ( n = tr_ptrArraySize( randPool ) ) )
1842        {
1843            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1844            c->doUnchoke = 1;
1845            t->optimistic = c->peer;
1846        }
1847
1848        tr_ptrArrayFree( randPool, NULL );
1849    }
1850
1851    for( i = 0; i < size; ++i )
1852        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1853
1854    /* cleanup */
1855    tr_free( choke );
1856    tr_free( peers );
1857}
1858
1859static int
1860rechokePulse( void * vtorrent )
1861{
1862    Torrent * t = vtorrent;
1863
1864    torrentLock( t );
1865    rechoke( t );
1866    torrentUnlock( t );
1867    return TRUE;
1868}
1869
1870/***
1871****
1872****  Life and Death
1873****
1874***/
1875
1876static int
1877shouldPeerBeClosed( const Torrent * t,
1878                    const tr_peer * peer,
1879                    int             peerCount )
1880{
1881    const tr_torrent *       tor = t->tor;
1882    const time_t             now = time( NULL );
1883    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1884
1885    /* if it's marked for purging, close it */
1886    if( peer->doPurge )
1887    {
1888        tordbg( t, "purging peer %s because its doPurge flag is set",
1889               tr_peerIoAddrStr( &atom->addr,
1890                                 atom->port ) );
1891        return TRUE;
1892    }
1893
1894    /* if we're seeding and the peer has everything we have,
1895     * and enough time has passed for a pex exchange, then disconnect */
1896    if( tr_torrentIsSeed( tor ) )
1897    {
1898        int peerHasEverything;
1899        if( atom->flags & ADDED_F_SEED_FLAG )
1900            peerHasEverything = TRUE;
1901        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1902            peerHasEverything = FALSE;
1903        else
1904        {
1905            tr_bitfield * tmp =
1906                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1907            tr_bitfieldDifference( tmp, peer->have );
1908            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1909            tr_bitfieldFree( tmp );
1910        }
1911        if( peerHasEverything
1912          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
1913        {
1914            tordbg( t, "purging peer %s because we're both seeds",
1915                   tr_peerIoAddrStr( &atom->addr,
1916                                     atom->port ) );
1917            return TRUE;
1918        }
1919    }
1920
1921    /* disconnect if it's been too long since piece data has been transferred.
1922     * this is on a sliding scale based on number of available peers... */
1923    {
1924        const int    relaxStrictnessIfFewerThanN =
1925            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
1926        /* if we have >= relaxIfFewerThan, strictness is 100%.
1927         * if we have zero connections, strictness is 0% */
1928        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
1929                                  ? 1.0
1930                                  : peerCount /
1931                                  (float)relaxStrictnessIfFewerThanN;
1932        const int    lo = MIN_UPLOAD_IDLE_SECS;
1933        const int    hi = MAX_UPLOAD_IDLE_SECS;
1934        const int    limit = lo + ( ( hi - lo ) * strictness );
1935        const time_t then = peer->pieceDataActivityDate;
1936        const int    idleTime = then ? ( now - then ) : 0;
1937        if( idleTime > limit )
1938        {
1939            tordbg(
1940                t,
1941                "purging peer %s because it's been %d secs since we shared anything",
1942                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
1943            return TRUE;
1944        }
1945    }
1946
1947    return FALSE;
1948}
1949
1950static tr_peer **
1951getPeersToClose( Torrent * t,
1952                 int *     setmeSize )
1953{
1954    int               i, peerCount, outsize;
1955    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
1956                                                           &peerCount );
1957    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
1958
1959    assert( torrentIsLocked( t ) );
1960
1961    for( i = outsize = 0; i < peerCount; ++i )
1962        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1963            ret[outsize++] = peers[i];
1964
1965    *setmeSize = outsize;
1966    return ret;
1967}
1968
1969static int
1970compareCandidates( const void * va,
1971                   const void * vb )
1972{
1973    const struct peer_atom * a = *(const struct peer_atom**) va;
1974    const struct peer_atom * b = *(const struct peer_atom**) vb;
1975
1976    /* <Charles> Here we would probably want to try reconnecting to
1977     * peers that had most recently given us data. Lots of users have
1978     * trouble with resets due to their routers and/or ISPs. This way we
1979     * can quickly recover from an unwanted reset. So we sort
1980     * piece_data_time in descending order.
1981     */
1982
1983    if( a->piece_data_time != b->piece_data_time )
1984        return a->piece_data_time < b->piece_data_time ? 1 : -1;
1985
1986    if( a->numFails != b->numFails )
1987        return a->numFails < b->numFails ? -1 : 1;
1988
1989    if( a->time != b->time )
1990        return a->time < b->time ? -1 : 1;
1991
1992    return 0;
1993}
1994
1995static int
1996getReconnectIntervalSecs( const struct peer_atom * atom )
1997{
1998    int          sec;
1999    const time_t now = time( NULL );
2000
2001    /* if we were recently connected to this peer and transferring piece
2002     * data, try to reconnect to them sooner rather that later -- we don't
2003     * want network troubles to get in the way of a good peer. */
2004    if( ( now - atom->piece_data_time ) <=
2005       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2006        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2007
2008    /* don't allow reconnects more often than our minimum */
2009    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2010        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2011
2012    /* otherwise, the interval depends on how many times we've tried
2013     * and failed to connect to the peer */
2014    else switch( atom->numFails )
2015        {
2016            case 0:
2017                sec = 0; break;
2018
2019            case 1:
2020                sec = 5; break;
2021
2022            case 2:
2023                sec = 2 * 60; break;
2024
2025            case 3:
2026                sec = 15 * 60; break;
2027
2028            case 4:
2029                sec = 30 * 60; break;
2030
2031            case 5:
2032                sec = 60 * 60; break;
2033
2034            default:
2035                sec = 120 * 60; break;
2036        }
2037
2038    return sec;
2039}
2040
2041static struct peer_atom **
2042getPeerCandidates(                               Torrent * t,
2043                                           int * setmeSize )
2044{
2045    int                 i, atomCount, retCount;
2046    struct peer_atom ** atoms;
2047    struct peer_atom ** ret;
2048    const time_t        now = time( NULL );
2049    const int           seed = tr_torrentIsSeed( t->tor );
2050
2051    assert( torrentIsLocked( t ) );
2052
2053    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2054    ret = tr_new( struct peer_atom*, atomCount );
2055    for( i = retCount = 0; i < atomCount; ++i )
2056    {
2057        int                interval;
2058        struct peer_atom * atom = atoms[i];
2059
2060        /* peer fed us too much bad data ... we only keep it around
2061         * now to weed it out in case someone sends it to us via pex */
2062        if( atom->myflags & MYFLAG_BANNED )
2063            continue;
2064
2065        /* peer was unconnectable before, so we're not going to keep trying.
2066         * this is needs a separate flag from `banned', since if they try
2067         * to connect to us later, we'll let them in */
2068        if( atom->myflags & MYFLAG_UNREACHABLE )
2069            continue;
2070
2071        /* we don't need two connections to the same peer... */
2072        if( peerIsInUse( t, &atom->addr ) )
2073            continue;
2074
2075        /* no need to connect if we're both seeds... */
2076        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2077            continue;
2078
2079        /* don't reconnect too often */
2080        interval = getReconnectIntervalSecs( atom );
2081        if( ( now - atom->time ) < interval )
2082        {
2083            tordbg(
2084                t,
2085                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2086                i, tr_peerIoAddrStr( &atom->addr,
2087                                     atom->port ), interval );
2088            continue;
2089        }
2090
2091        /* Don't connect to peers in our blocklist */
2092        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2093            continue;
2094
2095        ret[retCount++] = atom;
2096    }
2097
2098    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2099    *setmeSize = retCount;
2100    return ret;
2101}
2102
2103static int
2104reconnectPulse( void * vtorrent )
2105{
2106    Torrent *     t = vtorrent;
2107    static time_t prevTime = 0;
2108    static int    newConnectionsThisSecond = 0;
2109    time_t        now;
2110
2111    torrentLock( t );
2112
2113    now = time( NULL );
2114    if( prevTime != now )
2115    {
2116        prevTime = now;
2117        newConnectionsThisSecond = 0;
2118    }
2119
2120    if( !t->isRunning )
2121    {
2122        removeAllPeers( t );
2123    }
2124    else
2125    {
2126        int                 i, nCandidates, nBad;
2127        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2128        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2129
2130        if( nBad || nCandidates )
2131            tordbg(
2132                t, "reconnect pulse for [%s]: %d bad connections, "
2133                   "%d connection candidates, %d atoms, max per pulse is %d",
2134                t->tor->info.name, nBad, nCandidates,
2135                tr_ptrArraySize( t->pool ),
2136                (int)MAX_RECONNECTIONS_PER_PULSE );
2137
2138        /* disconnect some peers.
2139           if we transferred piece data, then they might be good peers,
2140           so reset their `numFails' weight to zero.  otherwise we connected
2141           to them fruitlessly, so mark it as another fail */
2142        for( i = 0; i < nBad; ++i )
2143        {
2144            tr_peer *          peer = connections[i];
2145            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2146            if( peer->pieceDataActivityDate )
2147                atom->numFails = 0;
2148            else
2149                ++atom->numFails;
2150            tordbg( t, "removing bad peer %s",
2151                   tr_peerIoGetAddrStr( peer->io ) );
2152            removePeer( t, peer );
2153        }
2154
2155        /* add some new ones */
2156        for( i = 0;    ( i < nCandidates )
2157           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2158           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2159           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2160             ++i )
2161        {
2162            tr_peerMgr *       mgr = t->manager;
2163            struct peer_atom * atom = candidates[i];
2164            tr_peerIo *        io;
2165
2166            tordbg( t, "Starting an OUTGOING connection with %s",
2167                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2168
2169            io =
2170                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2171                                      t->hash );
2172            if( io == NULL )
2173            {
2174                atom->myflags |= MYFLAG_UNREACHABLE;
2175            }
2176            else
2177            {
2178                tr_handshake * handshake = tr_handshakeNew(
2179                    io,
2180                    mgr->session->
2181                    encryptionMode,
2182                    myHandshakeDoneCB,
2183                    mgr );
2184
2185                assert( tr_peerIoGetTorrentHash( io ) );
2186
2187                ++newConnectionsThisSecond;
2188
2189                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2190                                         handshakeCompare );
2191            }
2192
2193            atom->time = time( NULL );
2194        }
2195
2196        /* cleanup */
2197        tr_free( connections );
2198        tr_free( candidates );
2199    }
2200
2201    torrentUnlock( t );
2202    return TRUE;
2203}
2204
2205/****
2206*****
2207*****
2208*****
2209****/
2210
2211static double
2212allocateHowMuch( double         desiredAvgKB,
2213                 const double * history )
2214{
2215    const double baseline = desiredAvgKB * 1024.0 /
2216                            BANDWIDTH_PULSES_PER_SECOND;
2217    const double min = baseline * 0.66;
2218    const double max = baseline * 1.33;
2219    int          i;
2220    double       usedBytes;
2221    double       n;
2222    double       clamped;
2223
2224    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2225        usedBytes += history[i];
2226
2227    n =
2228        ( desiredAvgKB *
2229          1024.0 ) *
2230        ( BANDWIDTH_PULSE_HISTORY +
2231          1.0 ) / BANDWIDTH_PULSES_PER_SECOND - usedBytes;
2232
2233    /* clamp the return value to lessen oscillation */
2234    clamped = n;
2235    clamped = MAX( clamped, min );
2236    clamped = MIN( clamped, max );
2237/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2238  (%.2f)\n", desiredAvgKB,
2239  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2240  clamped/1024.0, n/1024.0 );*/
2241    return clamped;
2242}
2243
2244/**
2245 * Distributes a fixed amount of bandwidth among a set of peers.
2246 *
2247 * @param peerArray peers whose client-to-peer bandwidth will be set
2248 * @param direction whether to allocate upload or download bandwidth
2249 * @param history recent bandwidth history for these peers
2250 * @param desiredAvgKB overall bandwidth goal for this set of peers
2251 */
2252static void
2253setPeerBandwidth( tr_ptrArray *      peerArray,
2254                  const tr_direction direction,
2255                  const double *     history,
2256                  double             desiredAvgKB )
2257{
2258    const int    peerCount = tr_ptrArraySize( peerArray );
2259    const double bytes = allocateHowMuch( desiredAvgKB, history );
2260    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2261    const double meritBytes = MAX( 0, bytes - welfareBytes );
2262    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2263    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2264    int          i;
2265    int          candidateCount;
2266    double       welfare;
2267    size_t       bytesUsed;
2268
2269    assert( meritBytes >= 0.0 );
2270    assert( welfareBytes >= 0.0 );
2271    assert( direction == TR_UP || direction == TR_DOWN );
2272
2273    for( i = candidateCount = 0; i < peerCount; ++i )
2274        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2275            candidates[candidateCount++] = peers[i];
2276        else
2277            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2278
2279    for( i = bytesUsed = 0; i < candidateCount; ++i )
2280        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2281                                                direction );
2282
2283    welfare = welfareBytes / candidateCount;
2284
2285    for( i = 0; i < candidateCount; ++i )
2286    {
2287        tr_peer *    peer = candidates[i];
2288        const double merit = bytesUsed
2289                             ? ( meritBytes *
2290                                tr_peerIoGetBandwidthUsed( peer->io,
2291                                                           direction ) ) /
2292                             bytesUsed
2293                             : ( meritBytes / candidateCount );
2294        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2295    }
2296
2297    /* cleanup */
2298    tr_free( candidates );
2299}
2300
2301static size_t
2302countHandshakeBandwidth( tr_ptrArray * handshakes,
2303                         tr_direction  direction )
2304{
2305    const int n = tr_ptrArraySize( handshakes );
2306    int       i;
2307    size_t    total;
2308
2309    for( i = total = 0; i < n; ++i )
2310    {
2311        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2312        total += tr_peerIoGetBandwidthUsed( io, direction );
2313    }
2314    return total;
2315}
2316
2317static size_t
2318countPeerBandwidth( tr_ptrArray * peers,
2319                    tr_direction  direction )
2320{
2321    const int n = tr_ptrArraySize( peers );
2322    int       i;
2323    size_t    total;
2324
2325    for( i = total = 0; i < n; ++i )
2326    {
2327        tr_peer * peer = tr_ptrArrayNth( peers, i );
2328        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2329    }
2330    return total;
2331}
2332
2333static void
2334givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2335                             tr_direction  direction )
2336{
2337    const int n = tr_ptrArraySize( peers );
2338    int       i;
2339
2340    for( i = 0; i < n; ++i )
2341    {
2342        tr_peer * peer = tr_ptrArrayNth( peers, i );
2343        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2344    }
2345}
2346
2347static void
2348pumpAllPeers( tr_peerMgr * mgr )
2349{
2350    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2351    int       i, j;
2352
2353    for( i = 0; i < torrentCount; ++i )
2354    {
2355        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2356        const int peerCount = tr_ptrArraySize( t->peers );
2357        for( j = 0; j < peerCount; ++j )
2358        {
2359            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2360            tr_peerMsgsPulse( peer->msgs );
2361        }
2362    }
2363}
2364
2365/**
2366 * Allocate bandwidth for each peer connection.
2367 *
2368 * @param mgr the peer manager
2369 * @param direction whether to allocate upload or download bandwidth
2370 * @return the amount of directional bandwidth used since the last pulse.
2371 */
2372static double
2373allocateBandwidth( tr_peerMgr * mgr,
2374                   tr_direction direction )
2375{
2376    tr_session *  session = mgr->session;
2377    const int     pulseNumber = mgr->bandwidthPulseNumber;
2378    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2379    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2380    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2381    double        allBytesUsed = 0;
2382    size_t        poolBytesUsed = 0;
2383    int           i;
2384
2385    assert( mgr );
2386    assert( direction == TR_UP || direction == TR_DOWN );
2387
2388    /* before allocating bandwidth, pump the connected peers */
2389    pumpAllPeers( mgr );
2390
2391    for( i = 0; i < torrentCount; ++i )
2392    {
2393        Torrent *    t = torrents[i];
2394        const size_t used = countPeerBandwidth( t->peers, direction );
2395        +countHandshakeBandwidth( t->outgoingHandshakes,
2396                                  direction );
2397
2398        /* add this torrent's bandwidth use to allBytesUsed */
2399        allBytesUsed += used;
2400
2401        /* process the torrent's peers based on its speed mode */
2402        switch( tr_torrentGetSpeedMode( t->tor, direction ) )
2403        {
2404            case TR_SPEEDLIMIT_UNLIMITED:
2405                givePeersUnlimitedBandwidth( t->peers, direction );
2406                break;
2407
2408            case TR_SPEEDLIMIT_SINGLE:
2409                t->tor->rateHistory[direction][pulseNumber] = used;
2410
2411                setPeerBandwidth( t->peers, direction,
2412                                 t->tor->rateHistory[direction],
2413                                 tr_torrentGetSpeedLimit( t->tor,
2414                                                          direction ) );
2415                break;
2416
2417            case TR_SPEEDLIMIT_GLOBAL:
2418            {
2419                int       i;
2420                const int n = tr_ptrArraySize( t->peers );
2421                for( i = 0; i < n; ++i )
2422                    tr_ptrArrayAppend( globalPool,
2423                                      tr_ptrArrayNth( t->peers, i ) );
2424                poolBytesUsed += used;
2425                break;
2426            }
2427        }
2428    }
2429
2430    /* add incoming handshakes to the global pool */
2431    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2432    allBytesUsed += i;
2433    poolBytesUsed += i;
2434
2435    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2436
2437    /* handle the global pool's connections */
2438    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2439        givePeersUnlimitedBandwidth( globalPool, direction );
2440    else
2441        setPeerBandwidth( globalPool, direction,
2442                         mgr->globalPoolHistory[direction],
2443                         tr_sessionGetSpeedLimit( session, direction ) );
2444
2445    /* now that we've allocated bandwidth, pump all the connected peers */
2446    pumpAllPeers( mgr );
2447
2448    /* cleanup */
2449    tr_ptrArrayFree( globalPool, NULL );
2450    return allBytesUsed;
2451}
2452
2453static int
2454bandwidthPulse( void * vmgr )
2455{
2456    tr_peerMgr * mgr = vmgr;
2457    int          i;
2458
2459    managerLock( mgr );
2460
2461    /* keep track of how far we are into the cycle */
2462    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2463        mgr->bandwidthPulseNumber = 0;
2464
2465    /* allocate the upload and download bandwidth */
2466    for( i = 0; i < 2; ++i )
2467        mgr->rateHistory[i][mgr->bandwidthPulseNumber] =
2468            allocateBandwidth( mgr, i );
2469
2470    managerUnlock( mgr );
2471    return TRUE;
2472}
2473
Note: See TracBrowser for help on using the repository browser.