source: trunk/libtransmission/peer-mgr.c @ 6957

Last change on this file since 6957 was 6957, checked in by charles, 13 years ago

(libT) minor code cleanup for r6954

  • Property svn:keywords set to Date Rev Author Id
File size: 72.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6957 2008-10-25 15:19:46Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "stats.h" /* tr_statsAddDownloaded */
35#include "torrent.h"
36#include "trevent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to change which peers are choked */
43    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
44
45    /* minimum interval for refilling peers' request lists */
46    REFILL_PERIOD_MSEC = 333,
47
48    /* when many peers are available, keep idle ones this long */
49    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
50
51    /* when few peers are available, keep idle ones this long */
52    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
56
57    /* max # of peers to ask fer per torrent per reconnect pulse */
58    MAX_RECONNECTIONS_PER_PULSE = 2,
59
60    /* max number of peers to ask for per second overall.
61    * this throttle is to avoid overloading the router */
62    MAX_CONNECTIONS_PER_SECOND = 4,
63
64    /* number of unchoked peers per torrent.
65     * FIXME: this probably ought to be configurable */
66    MAX_UNCHOKED_PEERS = 12,
67
68    /* number of bad pieces a peer is allowed to send before we ban them */
69    MAX_BAD_PIECES_PER_PEER = 3,
70
71    /* use for bitwise operations w/peer_atom.myflags */
72    MYFLAG_BANNED = 1,
73
74    /* unreachable for now... but not banned.
75     * if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2,
77
78    /* the minimum we'll wait before attempting to reconnect to a peer */
79    MINIMUM_RECONNECT_INTERVAL_SECS = 5
80};
81
82
83/**
84***
85**/
86
87/* We keep one of these for every peer we know about, whether
88 * it's connected or not, so the struct must be small.
89 * When our current connections underperform, we dip back
90 * into this list for new ones. */
91struct peer_atom
92{
93    uint8_t           from;
94    uint8_t           flags; /* these match the added_f flags */
95    uint8_t           myflags; /* flags that aren't defined in added_f */
96    uint16_t          port;
97    uint16_t          numFails;
98    struct in_addr    addr;
99    time_t            time; /* when the peer's connection status last changed */
100    time_t            piece_data_time;
101};
102
103typedef struct
104{
105    unsigned int    isRunning : 1;
106
107    uint8_t         hash[SHA_DIGEST_LENGTH];
108    int         *   pendingRequestCount;
109    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
110    tr_ptrArray *   pool; /* struct peer_atom */
111    tr_ptrArray *   peers; /* tr_peer */
112    tr_ptrArray *   webseeds; /* tr_webseed */
113    tr_timer *      reconnectTimer;
114    tr_timer *      rechokeTimer;
115    tr_timer *      refillTimer;
116    tr_torrent *    tor;
117    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
118
119    struct tr_peerMgr * manager;
120}
121Torrent;
122
123struct tr_peerMgr
124{
125    uint8_t        bandwidthPulseNumber;
126    tr_session *   session;
127    tr_ptrArray *  torrents; /* Torrent */
128    tr_ptrArray *  incomingHandshakes; /* tr_handshake */
129    tr_timer *     bandwidthTimer;
130    double         rateHistory[2][BANDWIDTH_PULSE_HISTORY];
131    double         globalPoolHistory[2][BANDWIDTH_PULSE_HISTORY];
132};
133
134#define tordbg( t, ... ) \
135    tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ )
136
137#define dbgmsg( ... ) \
138    tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ )
139
140/**
141***
142**/
143
144static void
145managerLock( const struct tr_peerMgr * manager )
146{
147    tr_globalLock( manager->session );
148}
149
150static void
151managerUnlock( const struct tr_peerMgr * manager )
152{
153    tr_globalUnlock( manager->session );
154}
155
156static void
157torrentLock( Torrent * torrent )
158{
159    managerLock( torrent->manager );
160}
161
162static void
163torrentUnlock( Torrent * torrent )
164{
165    managerUnlock( torrent->manager );
166}
167
168static int
169torrentIsLocked( const Torrent * t )
170{
171    return tr_globalIsLocked( t->manager->session );
172}
173
174/**
175***
176**/
177
178static int
179compareAddresses( const struct in_addr * a,
180                  const struct in_addr * b )
181{
182    if( a->s_addr != b->s_addr )
183        return a->s_addr < b->s_addr ? -1 : 1;
184
185    return 0;
186}
187
188static int
189handshakeCompareToAddr( const void * va,
190                        const void * vb )
191{
192    const tr_handshake * a = va;
193
194    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
195}
196
197static int
198handshakeCompare( const void * a,
199                  const void * b )
200{
201    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
202}
203
204static tr_handshake*
205getExistingHandshake( tr_ptrArray *          handshakes,
206                      const struct in_addr * in_addr )
207{
208    return tr_ptrArrayFindSorted( handshakes,
209                                  in_addr,
210                                  handshakeCompareToAddr );
211}
212
213static int
214comparePeerAtomToAddress( const void * va,
215                          const void * vb )
216{
217    const struct peer_atom * a = va;
218
219    return compareAddresses( &a->addr, vb );
220}
221
222static int
223comparePeerAtoms( const void * va,
224                  const void * vb )
225{
226    const struct peer_atom * b = vb;
227
228    return comparePeerAtomToAddress( va, &b->addr );
229}
230
231/**
232***
233**/
234
235static int
236torrentCompare( const void * va,
237                const void * vb )
238{
239    const Torrent * a = va;
240    const Torrent * b = vb;
241
242    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
243}
244
245static int
246torrentCompareToHash( const void * va,
247                      const void * vb )
248{
249    const Torrent * a = va;
250    const uint8_t * b_hash = vb;
251
252    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
253}
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
260                                             hash,
261                                             torrentCompareToHash );
262}
263
264static int
265peerCompare( const void * va,
266             const void * vb )
267{
268    const tr_peer * a = va;
269    const tr_peer * b = vb;
270
271    return compareAddresses( &a->in_addr, &b->in_addr );
272}
273
274static int
275peerCompareToAddr( const void * va,
276                   const void * vb )
277{
278    const tr_peer * a = va;
279
280    return compareAddresses( &a->in_addr, vb );
281}
282
283static tr_peer*
284getExistingPeer( Torrent *              torrent,
285                 const struct in_addr * in_addr )
286{
287    assert( torrentIsLocked( torrent ) );
288    assert( in_addr );
289
290    return tr_ptrArrayFindSorted( torrent->peers,
291                                  in_addr,
292                                  peerCompareToAddr );
293}
294
295static struct peer_atom*
296getExistingAtom( const                  Torrent * t,
297                 const struct in_addr * addr )
298{
299    assert( torrentIsLocked( t ) );
300    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
301}
302
303static int
304peerIsInUse( const Torrent *        ct,
305             const struct in_addr * addr )
306{
307    Torrent * t = (Torrent*) ct;
308
309    assert( torrentIsLocked ( t ) );
310
311    return getExistingPeer( t, addr )
312           || getExistingHandshake( t->outgoingHandshakes, addr )
313           || getExistingHandshake( t->manager->incomingHandshakes, addr );
314}
315
316static tr_peer*
317peerConstructor( const struct in_addr * in_addr )
318{
319    tr_peer * p;
320
321    p = tr_new0( tr_peer, 1 );
322    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
323    return p;
324}
325
326static tr_peer*
327getPeer( Torrent *              torrent,
328         const struct in_addr * in_addr )
329{
330    tr_peer * peer;
331
332    assert( torrentIsLocked( torrent ) );
333
334    peer = getExistingPeer( torrent, in_addr );
335
336    if( peer == NULL )
337    {
338        peer = peerConstructor( in_addr );
339        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
340    }
341
342    return peer;
343}
344
345static void
346peerDestructor( tr_peer * peer )
347{
348    assert( peer );
349    assert( peer->msgs );
350
351    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
352    tr_peerMsgsFree( peer->msgs );
353
354    tr_peerIoFree( peer->io );
355
356    tr_bitfieldFree( peer->have );
357    tr_bitfieldFree( peer->blame );
358    tr_free( peer->client );
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t,
364            tr_peer * peer )
365{
366    tr_peer *          removed;
367    struct peer_atom * atom;
368
369    assert( torrentIsLocked( t ) );
370
371    atom = getExistingAtom( t, &peer->in_addr );
372    assert( atom );
373    atom->time = time( NULL );
374
375    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
376    assert( removed == peer );
377    peerDestructor( removed );
378}
379
380static void
381removeAllPeers( Torrent * t )
382{
383    while( !tr_ptrArrayEmpty( t->peers ) )
384        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
385}
386
387static void
388torrentDestructor( void * vt )
389{
390    Torrent * t = vt;
391    uint8_t   hash[SHA_DIGEST_LENGTH];
392
393    assert( t );
394    assert( !t->isRunning );
395    assert( t->peers );
396    assert( torrentIsLocked( t ) );
397    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
398    assert( tr_ptrArrayEmpty( t->peers ) );
399
400    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
401
402    tr_timerFree( &t->reconnectTimer );
403    tr_timerFree( &t->rechokeTimer );
404    tr_timerFree( &t->refillTimer );
405
406    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
407    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
408    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
409    tr_ptrArrayFree( t->peers, NULL );
410
411    tr_free( t->pendingRequestCount );
412    tr_free( t );
413}
414
415static void peerCallbackFunc( void * vpeer,
416                              void * vevent,
417                              void * vt );
418
419static Torrent*
420torrentConstructor( tr_peerMgr * manager,
421                    tr_torrent * tor )
422{
423    int       i;
424    Torrent * t;
425
426    t = tr_new0( Torrent, 1 );
427    t->manager = manager;
428    t->tor = tor;
429    t->pool = tr_ptrArrayNew( );
430    t->peers = tr_ptrArrayNew( );
431    t->webseeds = tr_ptrArrayNew( );
432    t->outgoingHandshakes = tr_ptrArrayNew( );
433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
434
435    for( i = 0; i < tor->info.webseedCount; ++i )
436    {
437        tr_webseed * w =
438            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
439                           t );
440        tr_ptrArrayAppend( t->webseeds, w );
441    }
442
443    return t;
444}
445
446/**
447 * For explanation, see http://www.bittorrent.org/fast_extensions.html
448 * Also see the "test-allowed-set" unit test
449 *
450 * @param k number of pieces in set
451 * @param sz number of pieces in the torrent
452 * @param infohash torrent's SHA1 hash
453 * @param ip peer's address
454 */
455struct tr_bitfield *
456tr_peerMgrGenerateAllowedSet(
457    const uint32_t         k,
458    const uint32_t         sz,
459    const                  uint8_t        *
460                           infohash,
461    const struct in_addr * ip )
462{
463    uint8_t       w[SHA_DIGEST_LENGTH + 4];
464    uint8_t       x[SHA_DIGEST_LENGTH];
465    tr_bitfield * a;
466    uint32_t      a_size;
467
468    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
469    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
470    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
471
472    a = tr_bitfieldNew( sz );
473    a_size = 0;
474
475    while( a_size < k )
476    {
477        int i;
478        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
479        {
480            uint32_t j = i * 4;                                /* (5) */
481            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
482            uint32_t index = y % sz;                           /* (7) */
483            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
484            {
485                tr_bitfieldAdd( a, index );                    /* (9) */
486                ++a_size;
487            }
488        }
489        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
490    }
491
492    return a;
493}
494
495static int bandwidthPulse( void * vmgr );
496
497tr_peerMgr*
498tr_peerMgrNew( tr_session * session )
499{
500    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
501
502    m->session = session;
503    m->torrents = tr_ptrArrayNew( );
504    m->incomingHandshakes = tr_ptrArrayNew( );
505    m->bandwidthPulseNumber = -1;
506    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse,
507                                     m, 1000 / BANDWIDTH_PULSES_PER_SECOND );
508    return m;
509}
510
511void
512tr_peerMgrFree( tr_peerMgr * manager )
513{
514    managerLock( manager );
515
516    tr_timerFree( &manager->bandwidthTimer );
517
518    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
519     * the item from manager->handshakes, so this is a little roundabout... */
520    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
521        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
522
523    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
524
525    /* free the torrents. */
526    tr_ptrArrayFree( manager->torrents, torrentDestructor );
527
528    managerUnlock( manager );
529    tr_free( manager );
530}
531
532static tr_peer**
533getConnectedPeers( Torrent * t,
534                   int *     setmeCount )
535{
536    int       i, peerCount, connectionCount;
537    tr_peer **peers;
538    tr_peer **ret;
539
540    assert( torrentIsLocked( t ) );
541
542    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
543    ret = tr_new( tr_peer *, peerCount );
544
545    for( i = connectionCount = 0; i < peerCount; ++i )
546        if( peers[i]->msgs )
547            ret[connectionCount++] = peers[i];
548
549    *setmeCount = connectionCount;
550    return ret;
551}
552
553static int
554clientIsDownloadingFrom( const tr_peer * peer )
555{
556    return peer->clientIsInterested && !peer->clientIsChoked;
557}
558
559static int
560clientIsUploadingTo( const tr_peer * peer )
561{
562    return peer->peerIsInterested && !peer->peerIsChoked;
563}
564
565/***
566****
567***/
568
569int
570tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
571                      const uint8_t *        torrentHash,
572                      const struct in_addr * addr )
573{
574    int                      isSeed = FALSE;
575    const Torrent *          t = NULL;
576    const struct peer_atom * atom = NULL;
577
578    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
579    if( t )
580        atom = getExistingAtom( t, addr );
581    if( atom )
582        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
583
584    return isSeed;
585}
586
587/****
588*****
589*****  REFILL
590*****
591****/
592
593static void
594assertValidPiece( Torrent * t, tr_piece_index_t piece )
595{
596    assert( t );
597    assert( t->tor );
598    assert( piece < t->tor->info.pieceCount );
599}
600
601static int
602getPieceRequests( Torrent * t, tr_piece_index_t piece )
603{
604    assertValidPiece( t, piece );
605
606    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
607}
608
609static void
610incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
611{
612    assertValidPiece( t, piece );
613
614    if( t->pendingRequestCount == NULL )
615        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
616    t->pendingRequestCount[piece]++;
617}
618
619static void
620decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
621{
622    assertValidPiece( t, piece );
623
624    if( t->pendingRequestCount )
625        t->pendingRequestCount[piece]--;
626}
627
628struct tr_refill_piece
629{
630    tr_priority_t    priority;
631    uint32_t         piece;
632    uint32_t         peerCount;
633    int              random;
634    int              pendingRequestCount;
635    int              missingBlockCount;
636};
637
638static int
639compareRefillPiece( const void * aIn,
640                    const void * bIn )
641{
642    const struct tr_refill_piece * a = aIn;
643    const struct tr_refill_piece * b = bIn;
644
645    /* if one piece has a higher priority, it goes first */
646    if( a->priority != b->priority )
647        return a->priority > b->priority ? -1 : 1;
648
649    /* have a per-priority endgame */
650    if( a->pendingRequestCount != b->pendingRequestCount )
651        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
652
653    /* fewer missing pieces goes first */
654    if( a->missingBlockCount != b->missingBlockCount )
655        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
656
657    /* otherwise if one has fewer peers, it goes first */
658    if( a->peerCount != b->peerCount )
659        return a->peerCount < b->peerCount ? -1 : 1;
660
661    /* otherwise go with our random seed */
662    if( a->random != b->random )
663        return a->random < b->random ? -1 : 1;
664
665    return 0;
666}
667
668static tr_piece_index_t *
669getPreferredPieces( Torrent           * t,
670                    tr_piece_index_t  * pieceCount )
671{
672    const tr_torrent  * tor = t->tor;
673    const tr_info     * inf = &tor->info;
674    tr_piece_index_t    i;
675    tr_piece_index_t    poolSize = 0;
676    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
677    int                 peerCount;
678    tr_peer**           peers;
679
680    assert( torrentIsLocked( t ) );
681
682    peers = getConnectedPeers( t, &peerCount );
683
684    /* make a list of the pieces that we want but don't have */
685    for( i = 0; i < inf->pieceCount; ++i )
686        if( !tor->info.pieces[i].dnd
687                && !tr_cpPieceIsComplete( tor->completion, i ) )
688            pool[poolSize++] = i;
689
690    /* sort the pool by which to request next */
691    if( poolSize > 1 )
692    {
693        tr_piece_index_t j;
694        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
695
696        for( j = 0; j < poolSize; ++j )
697        {
698            int k;
699            const tr_piece_index_t piece = pool[j];
700            struct tr_refill_piece * setme = p + j;
701
702            setme->piece = piece;
703            setme->priority = inf->pieces[piece].priority;
704            setme->peerCount = 0;
705            setme->random = tr_cryptoWeakRandInt( INT_MAX );
706            setme->pendingRequestCount = getPieceRequests( t, piece );
707            setme->missingBlockCount
708                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
709
710            for( k = 0; k < peerCount; ++k )
711            {
712                const tr_peer * peer = peers[k];
713                if( peer->peerIsInterested
714                        && !peer->clientIsChoked
715                        && tr_bitfieldHas( peer->have, piece ) )
716                    ++setme->peerCount;
717            }
718        }
719
720        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
721               compareRefillPiece );
722
723        for( j = 0; j < poolSize; ++j )
724            pool[j] = p[j].piece;
725
726        tr_free( p );
727    }
728
729    tr_free( peers );
730
731    *pieceCount = poolSize;
732    return pool;
733}
734
735struct tr_blockIterator
736{
737    Torrent * t;
738    tr_block_index_t blockIndex, blockCount, *blocks;
739    tr_piece_index_t pieceIndex, pieceCount, *pieces;
740};
741
742static struct tr_blockIterator*
743blockIteratorNew( Torrent * t )
744{
745    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
746    i->t = t;
747    i->pieces = getPreferredPieces( t, &i->pieceCount );
748    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
749    return i;
750}
751
752static int
753blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
754{
755    int found;
756    Torrent * t = i->t;
757    tr_torrent * tor = t->tor;
758
759    while( ( i->blockIndex == i->blockCount )
760        && ( i->pieceIndex < i->pieceCount ) )
761    {
762        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
763        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
764        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
765        tr_block_index_t block;
766
767        assert( index < tor->info.pieceCount );
768
769        i->blockCount = 0;
770        i->blockIndex = 0;
771        for( block=b; block!=e; ++block )
772            if( !tr_cpBlockIsComplete( tor->completion, block ) )
773                i->blocks[i->blockCount++] = block;
774    }
775
776    if(( found = ( i->blockIndex < i->blockCount )))
777        *setme = i->blocks[i->blockIndex++];
778
779    return found;
780}
781
782static void
783blockIteratorFree( struct tr_blockIterator * i )
784{
785    tr_free( i->blocks );
786    tr_free( i->pieces );
787    tr_free( i );
788}
789
790static tr_peer**
791getPeersUploadingToClient( Torrent * t,
792                           int *     setmeCount )
793{
794    int j;
795    int peerCount = 0;
796    int retCount = 0;
797    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
798    tr_peer ** ret = tr_new( tr_peer *, peerCount );
799
800    j = 0; /* this is a temporary test to make sure we walk through all the peers */
801    if( peerCount )
802    {
803        /* Get a list of peers we're downloading from.
804           Pick a different starting point each time so all peers
805           get a chance at being the first in line */
806        const int fencepost = tr_cryptoWeakRandInt( peerCount );
807        int i = fencepost;
808        do {
809            if( clientIsDownloadingFrom( peers[i] ) )
810                ret[retCount++] = peers[i];
811            i = ( i + 1 ) % peerCount;
812            ++j;
813        } while( i != fencepost );
814    }
815    assert( j == peerCount );
816    *setmeCount = retCount;
817    return ret;
818}
819
820static uint32_t
821getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
822{
823    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
824    const uint64_t blockPos = tor->blockSize * b;
825    assert( blockPos >= piecePos );
826    return (uint32_t)( blockPos - piecePos );
827}
828
829static int
830refillPulse( void * vtorrent )
831{
832    tr_block_index_t block;
833    int peerCount;
834    int webseedCount;
835    tr_peer ** peers;
836    tr_webseed ** webseeds;
837    struct tr_blockIterator * blockIterator;
838    Torrent * t = vtorrent;
839    tr_torrent * tor = t->tor;
840
841    if( !t->isRunning )
842        return TRUE;
843    if( tr_torrentIsSeed( t->tor ) )
844        return TRUE;
845
846    torrentLock( t );
847    tordbg( t, "Refilling Request Buffers..." );
848
849    blockIterator = blockIteratorNew( t );
850    peers = getPeersUploadingToClient( t, &peerCount );
851    webseedCount = tr_ptrArraySize( t->webseeds );
852    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
853                          webseedCount * sizeof( tr_webseed* ) );
854
855    while( ( webseedCount || peerCount )
856        && blockIteratorNext( blockIterator, &block ) )
857    {
858        int j;
859        int handled = FALSE;
860
861        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
862        const uint32_t offset = getBlockOffsetInPiece( tor, block );
863        const uint32_t length = tr_torBlockCountBytes( tor, block );
864
865        assert( tr_torrentReqIsValid( tor, index, offset, length ) );
866        assert( _tr_block( tor, index, offset ) == block );
867        assert( offset < tr_torPieceCountBytes( tor, index ) );
868        assert( (offset + length) <= tr_torPieceCountBytes( tor, index ) );
869
870        /* find a peer who can ask for this block */
871        for( j=0; !handled && j<peerCount; )
872        {
873            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
874                                                   index, offset, length );
875            switch( val )
876            {
877                case TR_ADDREQ_FULL:
878                case TR_ADDREQ_CLIENT_CHOKED:
879                    peers[j] = peers[--peerCount];
880                    break;
881
882                case TR_ADDREQ_MISSING:
883                case TR_ADDREQ_DUPLICATE:
884                    ++j;
885                    break;
886
887                case TR_ADDREQ_OK:
888                    incrementPieceRequests( t, index );
889                    handled = TRUE;
890                    break;
891
892                default:
893                    assert( 0 && "unhandled value" );
894                    break;
895            }
896        }
897
898        /* maybe one of the webseeds can do it */
899        for( j=0; !handled && j<webseedCount; )
900        {
901            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
902                                                          index, offset, length );
903            switch( val )
904            {
905                case TR_ADDREQ_FULL:
906                    webseeds[j] = webseeds[--webseedCount];
907                    break;
908
909                case TR_ADDREQ_OK:
910                    incrementPieceRequests( t, index );
911                    handled = TRUE;
912                    break;
913
914                default:
915                    assert( 0 && "unhandled value" );
916                    break;
917            }
918        }
919    }
920
921    /* cleanup */
922    blockIteratorFree( blockIterator );
923    tr_free( webseeds );
924    tr_free( peers );
925
926    t->refillTimer = NULL;
927    torrentUnlock( t );
928    return FALSE;
929}
930
931static void
932broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
933{
934    int i, size;
935    tr_peer ** peers;
936
937    assert( torrentIsLocked( t ) );
938
939    peers = getConnectedPeers( t, &size );
940    for( i=0; i<size; ++i )
941        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
942    tr_free( peers );
943}
944
945static void
946addStrike( Torrent * t,
947           tr_peer * peer )
948{
949    tordbg( t, "increasing peer %s strike count to %d",
950            tr_peerIoAddrStr( &peer->in_addr,
951                              peer->port ), peer->strikes + 1 );
952
953    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
954    {
955        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
956        atom->myflags |= MYFLAG_BANNED;
957        peer->doPurge = 1;
958        tordbg( t, "banning peer %s",
959               tr_peerIoAddrStr( &atom->addr, atom->port ) );
960    }
961}
962
963static void
964gotBadPiece( Torrent *        t,
965             tr_piece_index_t pieceIndex )
966{
967    tr_torrent *   tor = t->tor;
968    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
969
970    tor->corruptCur += byteCount;
971    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
972}
973
974static void
975refillSoon( Torrent * t )
976{
977    if( t->refillTimer == NULL )
978        t->refillTimer = tr_timerNew( t->manager->session,
979                                      refillPulse, t,
980                                      REFILL_PERIOD_MSEC );
981}
982
983static void
984peerCallbackFunc( void * vpeer,
985                  void * vevent,
986                  void * vt )
987{
988    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
989    Torrent *             t = (Torrent *) vt;
990    const tr_peer_event * e = vevent;
991
992    torrentLock( t );
993
994    switch( e->eventType )
995    {
996        case TR_PEER_NEED_REQ:
997            refillSoon( t );
998            break;
999
1000        case TR_PEER_CANCEL:
1001            decrementPieceRequests( t, e->pieceIndex );
1002            break;
1003
1004        case TR_PEER_PEER_GOT_DATA:
1005        {
1006            const time_t now = time( NULL );
1007            tr_torrent * tor = t->tor;
1008            tor->activityDate = now;
1009            tor->uploadedCur += e->length;
1010            tr_statsAddUploaded( tor->session, e->length );
1011            if( peer )
1012            {
1013                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1014                a->piece_data_time = now;
1015            }
1016            break;
1017        }
1018
1019        case TR_PEER_CLIENT_GOT_DATA:
1020        {
1021            const time_t now = time( NULL );
1022            tr_torrent * tor = t->tor;
1023            tor->activityDate = now;
1024            /* only add this to downloadedCur if we got it from a peer --
1025             * webseeds shouldn't count against our ratio.  As one tracker
1026             * admin put it, "Those pieces are downloaded directly from the
1027             * content distributor, not the peers, it is the tracker's job
1028             * to manage the swarms, not the web server and does not fit
1029             * into the jurisdiction of the tracker." */
1030            if( peer )
1031                tor->downloadedCur += e->length;
1032            tr_statsAddDownloaded( tor->session, e->length );
1033            if( peer ) {
1034                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1035                a->piece_data_time = now;
1036            }
1037            break;
1038        }
1039
1040        case TR_PEER_PEER_PROGRESS:
1041        {
1042            if( peer )
1043            {
1044                struct peer_atom * atom = getExistingAtom( t,
1045                                                           &peer->in_addr );
1046                const int          peerIsSeed = e->progress >= 1.0;
1047                if( peerIsSeed )
1048                {
1049                    tordbg( t, "marking peer %s as a seed",
1050                           tr_peerIoAddrStr( &atom->addr,
1051                                             atom->port ) );
1052                    atom->flags |= ADDED_F_SEED_FLAG;
1053                }
1054                else
1055                {
1056                    tordbg( t, "marking peer %s as a non-seed",
1057                           tr_peerIoAddrStr( &atom->addr,
1058                                             atom->port ) );
1059                    atom->flags &= ~ADDED_F_SEED_FLAG;
1060                }
1061            }
1062            break;
1063        }
1064
1065        case TR_PEER_CLIENT_GOT_BLOCK:
1066        {
1067            tr_torrent *     tor = t->tor;
1068
1069            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1070                                                e->offset );
1071
1072            tr_cpBlockAdd( tor->completion, block );
1073            decrementPieceRequests( t, e->pieceIndex );
1074
1075            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1076
1077            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1078            {
1079                const tr_piece_index_t p = e->pieceIndex;
1080                const int              ok = tr_ioTestPiece( tor, p );
1081
1082                if( !ok )
1083                {
1084                    tr_torerr( tor,
1085                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1086                              (unsigned long)p );
1087                }
1088
1089                tr_torrentSetHasPiece( tor, p, ok );
1090                tr_torrentSetPieceChecked( tor, p, TRUE );
1091                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1092
1093                if( !ok )
1094                    gotBadPiece( t, p );
1095                else
1096                {
1097                    int        i, peerCount;
1098                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1099                    for( i = 0; i < peerCount; ++i )
1100                        tr_peerMsgsHave( peers[i]->msgs, p );
1101                    tr_free( peers );
1102                }
1103
1104                tr_torrentRecheckCompleteness( tor );
1105            }
1106            break;
1107        }
1108
1109        case TR_PEER_ERROR:
1110            if( e->err == EINVAL )
1111            {
1112                addStrike( t, peer );
1113                peer->doPurge = 1;
1114            }
1115            else if( ( e->err == ERANGE )
1116                  || ( e->err == EMSGSIZE )
1117                  || ( e->err == ENOTCONN ) )
1118            {
1119                /* some protocol error from the peer */
1120                peer->doPurge = 1;
1121            }
1122            else /* a local error, such as an IO error */
1123            {
1124                t->tor->error = e->err;
1125                tr_strlcpy( t->tor->errorString,
1126                            tr_strerror( t->tor->error ),
1127                            sizeof( t->tor->errorString ) );
1128                tr_torrentStop( t->tor );
1129            }
1130            break;
1131
1132        default:
1133            assert( 0 );
1134    }
1135
1136    torrentUnlock( t );
1137}
1138
1139static void
1140ensureAtomExists( Torrent *              t,
1141                  const struct in_addr * addr,
1142                  uint16_t               port,
1143                  uint8_t                flags,
1144                  uint8_t                from )
1145{
1146    if( getExistingAtom( t, addr ) == NULL )
1147    {
1148        struct peer_atom * a;
1149        a = tr_new0( struct peer_atom, 1 );
1150        a->addr = *addr;
1151        a->port = port;
1152        a->flags = flags;
1153        a->from = from;
1154        tordbg( t, "got a new atom: %s",
1155               tr_peerIoAddrStr( &a->addr, a->port ) );
1156        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1157    }
1158}
1159
1160static int
1161getMaxPeerCount( const tr_torrent * tor )
1162{
1163    return tor->maxConnectedPeers;
1164}
1165
1166static int
1167getPeerCount( const Torrent * t )
1168{
1169    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1170               t->outgoingHandshakes );
1171}
1172
1173/* FIXME: this is kind of a mess. */
1174static int
1175myHandshakeDoneCB( tr_handshake *  handshake,
1176                   tr_peerIo *     io,
1177                   int             isConnected,
1178                   const uint8_t * peer_id,
1179                   void *          vmanager )
1180{
1181    int                    ok = isConnected;
1182    int                    success = FALSE;
1183    uint16_t               port;
1184    const struct in_addr * addr;
1185    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1186    Torrent *              t;
1187    tr_handshake *         ours;
1188
1189    assert( io );
1190    assert( isConnected == 0 || isConnected == 1 );
1191
1192    t = tr_peerIoHasTorrentHash( io )
1193        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1194        : NULL;
1195
1196    if( tr_peerIoIsIncoming ( io ) )
1197        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1198                                        handshake, handshakeCompare );
1199    else if( t )
1200        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1201                                        handshake, handshakeCompare );
1202    else
1203        ours = handshake;
1204
1205    assert( ours );
1206    assert( ours == handshake );
1207
1208    if( t )
1209        torrentLock( t );
1210
1211    addr = tr_peerIoGetAddress( io, &port );
1212
1213    if( !ok || !t || !t->isRunning )
1214    {
1215        if( t )
1216        {
1217            struct peer_atom * atom = getExistingAtom( t, addr );
1218            if( atom )
1219                ++atom->numFails;
1220        }
1221
1222        tr_peerIoFree( io );
1223    }
1224    else /* looking good */
1225    {
1226        struct peer_atom * atom;
1227        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1228        atom = getExistingAtom( t, addr );
1229        atom->time = time( NULL );
1230
1231        if( atom->myflags & MYFLAG_BANNED )
1232        {
1233            tordbg( t, "banned peer %s tried to reconnect",
1234                   tr_peerIoAddrStr( &atom->addr,
1235                                     atom->port ) );
1236            tr_peerIoFree( io );
1237        }
1238        else if( tr_peerIoIsIncoming( io )
1239               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1240
1241        {
1242            tr_peerIoFree( io );
1243        }
1244        else
1245        {
1246            tr_peer * peer = getExistingPeer( t, addr );
1247
1248            if( peer ) /* we already have this peer */
1249            {
1250                tr_peerIoFree( io );
1251            }
1252            else
1253            {
1254                peer = getPeer( t, addr );
1255                tr_free( peer->client );
1256
1257                if( !peer_id )
1258                    peer->client = NULL;
1259                else
1260                {
1261                    char client[128];
1262                    tr_clientForId( client, sizeof( client ), peer_id );
1263                    peer->client = tr_strdup( client );
1264                }
1265                peer->port = port;
1266                peer->io = io;
1267                peer->msgs =
1268                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1269                                    &peer->msgsTag );
1270
1271                success = TRUE;
1272            }
1273        }
1274    }
1275
1276    if( t )
1277        torrentUnlock( t );
1278
1279    return success;
1280}
1281
1282void
1283tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1284                       struct in_addr * addr,
1285                       uint16_t         port,
1286                       int              socket )
1287{
1288    managerLock( manager );
1289
1290    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1291    {
1292        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1293               inet_ntoa( *addr ) );
1294        tr_netClose( socket );
1295    }
1296    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1297    {
1298        tr_netClose( socket );
1299    }
1300    else /* we don't have a connetion to them yet... */
1301    {
1302        tr_peerIo *    io;
1303        tr_handshake * handshake;
1304
1305        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1306
1307        handshake = tr_handshakeNew( io,
1308                                     manager->session->encryptionMode,
1309                                     myHandshakeDoneCB,
1310                                     manager );
1311
1312        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1313                                 handshakeCompare );
1314    }
1315
1316    managerUnlock( manager );
1317}
1318
1319void
1320tr_peerMgrAddPex( tr_peerMgr *    manager,
1321                  const uint8_t * torrentHash,
1322                  uint8_t         from,
1323                  const tr_pex *  pex )
1324{
1325    Torrent * t;
1326
1327    managerLock( manager );
1328
1329    t = getExistingTorrent( manager, torrentHash );
1330    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1331        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1332
1333    managerUnlock( manager );
1334}
1335
1336tr_pex *
1337tr_peerMgrCompactToPex( const void *    compact,
1338                        size_t          compactLen,
1339                        const uint8_t * added_f,
1340                        size_t          added_f_len,
1341                        size_t *        pexCount )
1342{
1343    size_t          i;
1344    size_t          n = compactLen / 6;
1345    const uint8_t * walk = compact;
1346    tr_pex *        pex = tr_new0( tr_pex, n );
1347
1348    for( i = 0; i < n; ++i )
1349    {
1350        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1351        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1352        if( added_f && ( n == added_f_len ) )
1353            pex[i].flags = added_f[i];
1354    }
1355
1356    *pexCount = n;
1357    return pex;
1358}
1359
1360/**
1361***
1362**/
1363
1364void
1365tr_peerMgrSetBlame( tr_peerMgr *     manager,
1366                    const uint8_t *  torrentHash,
1367                    tr_piece_index_t pieceIndex,
1368                    int              success )
1369{
1370    if( !success )
1371    {
1372        int        peerCount, i;
1373        Torrent *  t = getExistingTorrent( manager, torrentHash );
1374        tr_peer ** peers;
1375
1376        assert( torrentIsLocked( t ) );
1377
1378        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1379        for( i = 0; i < peerCount; ++i )
1380        {
1381            tr_peer * peer = peers[i];
1382            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1383            {
1384                tordbg(
1385                    t,
1386                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1387                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1388                    pieceIndex, (int)peer->strikes + 1 );
1389                addStrike( t, peer );
1390            }
1391        }
1392    }
1393}
1394
1395int
1396tr_pexCompare( const void * va,
1397               const void * vb )
1398{
1399    const tr_pex * a = va;
1400    const tr_pex * b = vb;
1401    int            i =
1402        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1403
1404    if( i ) return i;
1405    if( a->port < b->port ) return -1;
1406    if( a->port > b->port ) return 1;
1407    return 0;
1408}
1409
1410int tr_pexCompare( const void * a,
1411                   const void * b );
1412
1413static int
1414peerPrefersCrypto( const tr_peer * peer )
1415{
1416    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1417        return TRUE;
1418
1419    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1420        return FALSE;
1421
1422    return tr_peerIoIsEncrypted( peer->io );
1423}
1424
1425int
1426tr_peerMgrGetPeers( tr_peerMgr *    manager,
1427                    const uint8_t * torrentHash,
1428                    tr_pex **       setme_pex )
1429{
1430    int peerCount = 0;
1431    const Torrent *  t;
1432
1433    managerLock( manager );
1434
1435    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1436    if( !t )
1437    {
1438        *setme_pex = NULL;
1439    }
1440    else
1441    {
1442        int i;
1443        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1444        tr_pex * pex = tr_new( tr_pex, peerCount );
1445        tr_pex * walk = pex;
1446
1447        for( i = 0; i < peerCount; ++i, ++walk )
1448        {
1449            const tr_peer * peer = peers[i];
1450            walk->in_addr = peer->in_addr;
1451            walk->port = peer->port;
1452            walk->flags = 0;
1453            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1454            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1455        }
1456
1457        assert( ( walk - pex ) == peerCount );
1458        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1459        *setme_pex = pex;
1460    }
1461
1462    managerUnlock( manager );
1463    return peerCount;
1464}
1465
1466static int reconnectPulse( void * vtorrent );
1467
1468static int rechokePulse( void * vtorrent );
1469
1470void
1471tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1472                        const uint8_t * torrentHash )
1473{
1474    Torrent * t;
1475
1476    managerLock( manager );
1477
1478    t = getExistingTorrent( manager, torrentHash );
1479
1480    assert( t );
1481    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1482    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1483
1484    if( !t->isRunning )
1485    {
1486        t->isRunning = 1;
1487
1488        t->reconnectTimer = tr_timerNew( t->manager->session,
1489                                         reconnectPulse, t,
1490                                         RECONNECT_PERIOD_MSEC );
1491
1492        t->rechokeTimer = tr_timerNew( t->manager->session,
1493                                       rechokePulse, t,
1494                                       RECHOKE_PERIOD_MSEC );
1495
1496        reconnectPulse( t );
1497
1498        rechokePulse( t );
1499
1500        if( !tr_ptrArrayEmpty( t->webseeds ) )
1501            refillSoon( t );
1502    }
1503
1504    managerUnlock( manager );
1505}
1506
1507static void
1508stopTorrent( Torrent * t )
1509{
1510    assert( torrentIsLocked( t ) );
1511
1512    t->isRunning = 0;
1513    tr_timerFree( &t->rechokeTimer );
1514    tr_timerFree( &t->reconnectTimer );
1515
1516    /* disconnect the peers. */
1517    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1518    tr_ptrArrayClear( t->peers );
1519
1520    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1521     * which removes the handshake from t->outgoingHandshakes... */
1522    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1523        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1524}
1525
1526void
1527tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1528                       const uint8_t * torrentHash )
1529{
1530    managerLock( manager );
1531
1532    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1533
1534    managerUnlock( manager );
1535}
1536
1537void
1538tr_peerMgrAddTorrent( tr_peerMgr * manager,
1539                      tr_torrent * tor )
1540{
1541    Torrent * t;
1542
1543    managerLock( manager );
1544
1545    assert( tor );
1546    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1547
1548    t = torrentConstructor( manager, tor );
1549    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1550
1551    managerUnlock( manager );
1552}
1553
1554void
1555tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1556                         const uint8_t * torrentHash )
1557{
1558    Torrent * t;
1559
1560    managerLock( manager );
1561
1562    t = getExistingTorrent( manager, torrentHash );
1563    assert( t );
1564    stopTorrent( t );
1565    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1566    torrentDestructor( t );
1567
1568    managerUnlock( manager );
1569}
1570
1571void
1572tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1573                               const uint8_t *    torrentHash,
1574                               int8_t *           tab,
1575                               unsigned int       tabCount )
1576{
1577    tr_piece_index_t   i;
1578    const Torrent *    t;
1579    const tr_torrent * tor;
1580    float              interval;
1581    int                isComplete;
1582    int                peerCount;
1583    const tr_peer **   peers;
1584
1585    managerLock( manager );
1586
1587    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1588    tor = t->tor;
1589    interval = tor->info.pieceCount / (float)tabCount;
1590    isComplete = tor
1591                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1592    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1593
1594    memset( tab, 0, tabCount );
1595
1596    for( i = 0; tor && i < tabCount; ++i )
1597    {
1598        const int piece = i * interval;
1599
1600        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1601            tab[i] = -1;
1602        else if( peerCount )
1603        {
1604            int j;
1605            for( j = 0; j < peerCount; ++j )
1606                if( tr_bitfieldHas( peers[j]->have, i ) )
1607                    ++tab[i];
1608        }
1609    }
1610
1611    managerUnlock( manager );
1612}
1613
1614/* Returns the pieces that are available from peers */
1615tr_bitfield*
1616tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1617                        const uint8_t *    torrentHash )
1618{
1619    int           i, size;
1620    Torrent *     t;
1621    tr_peer **    peers;
1622    tr_bitfield * pieces;
1623
1624    managerLock( manager );
1625
1626    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1627    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1628    peers = getConnectedPeers( t, &size );
1629    for( i = 0; i < size; ++i )
1630        tr_bitfieldOr( pieces, peers[i]->have );
1631
1632    managerUnlock( manager );
1633    tr_free( peers );
1634    return pieces;
1635}
1636
1637int
1638tr_peerMgrHasConnections( const tr_peerMgr * manager,
1639                          const uint8_t *    torrentHash )
1640{
1641    int             ret;
1642    const Torrent * t;
1643
1644    managerLock( manager );
1645
1646    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1647    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1648               || !tr_ptrArrayEmpty( t->webseeds ) );
1649
1650    managerUnlock( manager );
1651    return ret;
1652}
1653
1654void
1655tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1656                        const uint8_t *    torrentHash,
1657                        int *              setmePeersKnown,
1658                        int *              setmePeersConnected,
1659                        int *              setmeSeedsConnected,
1660                        int *              setmeWebseedsSendingToUs,
1661                        int *              setmePeersSendingToUs,
1662                        int *              setmePeersGettingFromUs,
1663                        int *              setmePeersFrom )
1664{
1665    int                 i, size;
1666    const Torrent *     t;
1667    const tr_peer **    peers;
1668    const tr_webseed ** webseeds;
1669
1670    managerLock( manager );
1671
1672    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1673    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1674
1675    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1676    *setmePeersConnected       = 0;
1677    *setmeSeedsConnected       = 0;
1678    *setmePeersGettingFromUs   = 0;
1679    *setmePeersSendingToUs     = 0;
1680    *setmeWebseedsSendingToUs  = 0;
1681
1682    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1683        setmePeersFrom[i] = 0;
1684
1685    for( i = 0; i < size; ++i )
1686    {
1687        const tr_peer *          peer = peers[i];
1688        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1689
1690        if( peer->io == NULL ) /* not connected */
1691            continue;
1692
1693        ++ * setmePeersConnected;
1694
1695        ++setmePeersFrom[atom->from];
1696
1697        if( clientIsDownloadingFrom( peer ) )
1698            ++ * setmePeersSendingToUs;
1699
1700        if( clientIsUploadingTo( peer ) )
1701            ++ * setmePeersGettingFromUs;
1702
1703        if( atom->flags & ADDED_F_SEED_FLAG )
1704            ++ * setmeSeedsConnected;
1705    }
1706
1707    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1708    for( i = 0; i < size; ++i )
1709    {
1710        if( tr_webseedIsActive( webseeds[i] ) )
1711            ++ * setmeWebseedsSendingToUs;
1712    }
1713
1714    managerUnlock( manager );
1715}
1716
1717double
1718tr_peerMgrGetRate( const tr_peerMgr * manager,
1719                   tr_direction       direction )
1720{
1721    int    i;
1722    double bytes = 0;
1723
1724    assert( manager != NULL );
1725    assert( direction == TR_UP || direction == TR_DOWN );
1726
1727    for( i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
1728        bytes += manager->rateHistory[direction][i];
1729
1730    return ( BANDWIDTH_PULSES_PER_SECOND * bytes )
1731           / ( BANDWIDTH_PULSE_HISTORY * 1024 );
1732}
1733
1734float*
1735tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1736                     const uint8_t *    torrentHash )
1737{
1738    const Torrent *     t;
1739    const tr_webseed ** webseeds;
1740    int                 i;
1741    int                 webseedCount;
1742    float *             ret;
1743
1744    assert( manager );
1745    managerLock( manager );
1746
1747    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1748    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1749                                                     &webseedCount );
1750    assert( webseedCount == t->tor->info.webseedCount );
1751    ret = tr_new0( float, webseedCount );
1752
1753    for( i = 0; i < webseedCount; ++i )
1754        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1755            ret[i] = -1.0;
1756
1757    managerUnlock( manager );
1758    return ret;
1759}
1760
1761struct tr_peer_stat *
1762tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1763                     const   uint8_t     * torrentHash,
1764                     int                 * setmeCount UNUSED )
1765{
1766    int             i, size;
1767    const Torrent * t;
1768    tr_peer **      peers;
1769    tr_peer_stat *  ret;
1770
1771    assert( manager );
1772    managerLock( manager );
1773
1774    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1775    peers = getConnectedPeers( (Torrent*)t, &size );
1776    ret = tr_new0( tr_peer_stat, size );
1777
1778    for( i = 0; i < size; ++i )
1779    {
1780        char *                   pch;
1781        const tr_peer *          peer = peers[i];
1782        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1783        tr_peer_stat *           stat = ret + i;
1784
1785        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1786        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1787                   sizeof( stat->client ) );
1788        stat->port               = peer->port;
1789        stat->from               = atom->from;
1790        stat->progress           = peer->progress;
1791        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1792        stat->rateToPeer         = tr_peerIoGetRateToPeer( peer->io );
1793        stat->rateToClient       = tr_peerIoGetRateToClient( peer->io );
1794        stat->peerIsChoked       = peer->peerIsChoked;
1795        stat->peerIsInterested   = peer->peerIsInterested;
1796        stat->clientIsChoked     = peer->clientIsChoked;
1797        stat->clientIsInterested = peer->clientIsInterested;
1798        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1799        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1800        stat->isUploadingTo      = clientIsUploadingTo( peer );
1801
1802        pch = stat->flagStr;
1803        if( t->optimistic == peer ) *pch++ = 'O';
1804        if( stat->isDownloadingFrom ) *pch++ = 'D';
1805        else if( stat->clientIsInterested ) *pch++ = 'd';
1806        if( stat->isUploadingTo ) *pch++ = 'U';
1807        else if( stat->peerIsInterested ) *pch++ = 'u';
1808        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1809                'K';
1810        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1811        if( stat->isEncrypted ) *pch++ = 'E';
1812        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1813        if( stat->isIncoming ) *pch++ = 'I';
1814        *pch = '\0';
1815    }
1816
1817    *setmeCount = size;
1818    tr_free( peers );
1819
1820    managerUnlock( manager );
1821    return ret;
1822}
1823
1824/**
1825***
1826**/
1827
1828struct ChokeData
1829{
1830    unsigned int    doUnchoke    : 1;
1831    unsigned int    isInterested : 1;
1832    double          rateToClient;
1833    double          rateToPeer;
1834    tr_peer *       peer;
1835};
1836
1837static int
1838tr_compareDouble( double a,
1839                  double b )
1840{
1841    if( a < b ) return -1;
1842    if( a > b ) return 1;
1843    return 0;
1844}
1845
1846static int
1847compareChoke( const void * va,
1848              const void * vb )
1849{
1850    const struct ChokeData * a = va;
1851    const struct ChokeData * b = vb;
1852    int                      diff = 0;
1853
1854    if( diff == 0 ) /* prefer higher dl speeds */
1855        diff = -tr_compareDouble( a->rateToClient, b->rateToClient );
1856    if( diff == 0 ) /* prefer higher ul speeds */
1857        diff = -tr_compareDouble( a->rateToPeer, b->rateToPeer );
1858    if( diff == 0 ) /* prefer unchoked */
1859        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1860
1861    return diff;
1862}
1863
1864static int
1865isNew( const tr_peer * peer )
1866{
1867    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1868}
1869
1870static int
1871isSame( const tr_peer * peer )
1872{
1873    return peer && peer->client && strstr( peer->client, "Transmission" );
1874}
1875
1876/**
1877***
1878**/
1879
1880static void
1881rechoke( Torrent * t )
1882{
1883    int                i, peerCount, size, unchokedInterested;
1884    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1885    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1886
1887    assert( torrentIsLocked( t ) );
1888
1889    /* sort the peers by preference and rate */
1890    for( i = 0, size = 0; i < peerCount; ++i )
1891    {
1892        tr_peer * peer = peers[i];
1893        if( peer->progress >= 1.0 ) /* choke all seeds */
1894            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1895        else
1896        {
1897            struct ChokeData * node = &choke[size++];
1898            node->peer = peer;
1899            node->isInterested = peer->peerIsInterested;
1900            node->rateToClient = tr_peerIoGetRateToClient( peer->io );
1901            node->rateToPeer = tr_peerIoGetRateToPeer( peer->io );
1902        }
1903    }
1904
1905    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1906
1907    /**
1908     * Reciprocation and number of uploads capping is managed by unchoking
1909     * the N peers which have the best upload rate and are interested.
1910     * This maximizes the client's download rate. These N peers are
1911     * referred to as downloaders, because they are interested in downloading
1912     * from the client.
1913     *
1914     * Peers which have a better upload rate (as compared to the downloaders)
1915     * but aren't interested get unchoked. If they become interested, the
1916     * downloader with the worst upload rate gets choked. If a client has
1917     * a complete file, it uses its upload rate rather than its download
1918     * rate to decide which peers to unchoke.
1919     */
1920    unchokedInterested = 0;
1921    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1922    {
1923        choke[i].doUnchoke = 1;
1924        if( choke[i].isInterested )
1925            ++unchokedInterested;
1926    }
1927
1928    /* optimistic unchoke */
1929    if( i < size )
1930    {
1931        int                n;
1932        struct ChokeData * c;
1933        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1934
1935        for( ; i < size; ++i )
1936        {
1937            if( choke[i].isInterested )
1938            {
1939                const tr_peer * peer = choke[i].peer;
1940                int             x = 1, y;
1941                if( isNew( peer ) ) x *= 3;
1942                if( isSame( peer ) ) x *= 3;
1943                for( y = 0; y < x; ++y )
1944                    tr_ptrArrayAppend( randPool, &choke[i] );
1945            }
1946        }
1947
1948        if( ( n = tr_ptrArraySize( randPool ) ) )
1949        {
1950            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1951            c->doUnchoke = 1;
1952            t->optimistic = c->peer;
1953        }
1954
1955        tr_ptrArrayFree( randPool, NULL );
1956    }
1957
1958    for( i = 0; i < size; ++i )
1959        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1960
1961    /* cleanup */
1962    tr_free( choke );
1963    tr_free( peers );
1964}
1965
1966static int
1967rechokePulse( void * vtorrent )
1968{
1969    Torrent * t = vtorrent;
1970
1971    torrentLock( t );
1972    rechoke( t );
1973    torrentUnlock( t );
1974    return TRUE;
1975}
1976
1977/***
1978****
1979****  Life and Death
1980****
1981***/
1982
1983static int
1984shouldPeerBeClosed( const Torrent * t,
1985                    const tr_peer * peer,
1986                    int             peerCount )
1987{
1988    const tr_torrent *       tor = t->tor;
1989    const time_t             now = time( NULL );
1990    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1991
1992    /* if it's marked for purging, close it */
1993    if( peer->doPurge )
1994    {
1995        tordbg( t, "purging peer %s because its doPurge flag is set",
1996               tr_peerIoAddrStr( &atom->addr,
1997                                 atom->port ) );
1998        return TRUE;
1999    }
2000
2001    /* if we're seeding and the peer has everything we have,
2002     * and enough time has passed for a pex exchange, then disconnect */
2003    if( tr_torrentIsSeed( tor ) )
2004    {
2005        int peerHasEverything;
2006        if( atom->flags & ADDED_F_SEED_FLAG )
2007            peerHasEverything = TRUE;
2008        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2009            peerHasEverything = FALSE;
2010        else
2011        {
2012            tr_bitfield * tmp =
2013                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2014            tr_bitfieldDifference( tmp, peer->have );
2015            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2016            tr_bitfieldFree( tmp );
2017        }
2018        if( peerHasEverything
2019          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2020        {
2021            tordbg( t, "purging peer %s because we're both seeds",
2022                   tr_peerIoAddrStr( &atom->addr,
2023                                     atom->port ) );
2024            return TRUE;
2025        }
2026    }
2027
2028    /* disconnect if it's been too long since piece data has been transferred.
2029     * this is on a sliding scale based on number of available peers... */
2030    {
2031        const int    relaxStrictnessIfFewerThanN =
2032            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2033        /* if we have >= relaxIfFewerThan, strictness is 100%.
2034         * if we have zero connections, strictness is 0% */
2035        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2036                                  ? 1.0
2037                                  : peerCount /
2038                                  (float)relaxStrictnessIfFewerThanN;
2039        const int    lo = MIN_UPLOAD_IDLE_SECS;
2040        const int    hi = MAX_UPLOAD_IDLE_SECS;
2041        const int    limit = lo + ( ( hi - lo ) * strictness );
2042        const time_t then = peer->pieceDataActivityDate;
2043        const int    idleTime = then ? ( now - then ) : 0;
2044        if( idleTime > limit )
2045        {
2046            tordbg(
2047                t,
2048                "purging peer %s because it's been %d secs since we shared anything",
2049                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2050            return TRUE;
2051        }
2052    }
2053
2054    return FALSE;
2055}
2056
2057static tr_peer **
2058getPeersToClose( Torrent * t,
2059                 int *     setmeSize )
2060{
2061    int               i, peerCount, outsize;
2062    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2063                                                           &peerCount );
2064    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2065
2066    assert( torrentIsLocked( t ) );
2067
2068    for( i = outsize = 0; i < peerCount; ++i )
2069        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2070            ret[outsize++] = peers[i];
2071
2072    *setmeSize = outsize;
2073    return ret;
2074}
2075
2076static int
2077compareCandidates( const void * va,
2078                   const void * vb )
2079{
2080    const struct peer_atom * a = *(const struct peer_atom**) va;
2081    const struct peer_atom * b = *(const struct peer_atom**) vb;
2082
2083    /* <Charles> Here we would probably want to try reconnecting to
2084     * peers that had most recently given us data. Lots of users have
2085     * trouble with resets due to their routers and/or ISPs. This way we
2086     * can quickly recover from an unwanted reset. So we sort
2087     * piece_data_time in descending order.
2088     */
2089
2090    if( a->piece_data_time != b->piece_data_time )
2091        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2092
2093    if( a->numFails != b->numFails )
2094        return a->numFails < b->numFails ? -1 : 1;
2095
2096    if( a->time != b->time )
2097        return a->time < b->time ? -1 : 1;
2098
2099    return 0;
2100}
2101
2102static int
2103getReconnectIntervalSecs( const struct peer_atom * atom )
2104{
2105    int          sec;
2106    const time_t now = time( NULL );
2107
2108    /* if we were recently connected to this peer and transferring piece
2109     * data, try to reconnect to them sooner rather that later -- we don't
2110     * want network troubles to get in the way of a good peer. */
2111    if( ( now - atom->piece_data_time ) <=
2112       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2113        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2114
2115    /* don't allow reconnects more often than our minimum */
2116    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2117        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2118
2119    /* otherwise, the interval depends on how many times we've tried
2120     * and failed to connect to the peer */
2121    else switch( atom->numFails )
2122        {
2123            case 0:
2124                sec = 0; break;
2125
2126            case 1:
2127                sec = 5; break;
2128
2129            case 2:
2130                sec = 2 * 60; break;
2131
2132            case 3:
2133                sec = 15 * 60; break;
2134
2135            case 4:
2136                sec = 30 * 60; break;
2137
2138            case 5:
2139                sec = 60 * 60; break;
2140
2141            default:
2142                sec = 120 * 60; break;
2143        }
2144
2145    return sec;
2146}
2147
2148static struct peer_atom **
2149getPeerCandidates(                               Torrent * t,
2150                                           int * setmeSize )
2151{
2152    int                 i, atomCount, retCount;
2153    struct peer_atom ** atoms;
2154    struct peer_atom ** ret;
2155    const time_t        now = time( NULL );
2156    const int           seed = tr_torrentIsSeed( t->tor );
2157
2158    assert( torrentIsLocked( t ) );
2159
2160    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2161    ret = tr_new( struct peer_atom*, atomCount );
2162    for( i = retCount = 0; i < atomCount; ++i )
2163    {
2164        int                interval;
2165        struct peer_atom * atom = atoms[i];
2166
2167        /* peer fed us too much bad data ... we only keep it around
2168         * now to weed it out in case someone sends it to us via pex */
2169        if( atom->myflags & MYFLAG_BANNED )
2170            continue;
2171
2172        /* peer was unconnectable before, so we're not going to keep trying.
2173         * this is needs a separate flag from `banned', since if they try
2174         * to connect to us later, we'll let them in */
2175        if( atom->myflags & MYFLAG_UNREACHABLE )
2176            continue;
2177
2178        /* we don't need two connections to the same peer... */
2179        if( peerIsInUse( t, &atom->addr ) )
2180            continue;
2181
2182        /* no need to connect if we're both seeds... */
2183        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2184            continue;
2185
2186        /* don't reconnect too often */
2187        interval = getReconnectIntervalSecs( atom );
2188        if( ( now - atom->time ) < interval )
2189        {
2190            tordbg(
2191                t,
2192                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2193                i, tr_peerIoAddrStr( &atom->addr,
2194                                     atom->port ), interval );
2195            continue;
2196        }
2197
2198        /* Don't connect to peers in our blocklist */
2199        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2200            continue;
2201
2202        ret[retCount++] = atom;
2203    }
2204
2205    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2206    *setmeSize = retCount;
2207    return ret;
2208}
2209
2210static int
2211reconnectPulse( void * vtorrent )
2212{
2213    Torrent *     t = vtorrent;
2214    static time_t prevTime = 0;
2215    static int    newConnectionsThisSecond = 0;
2216    time_t        now;
2217
2218    torrentLock( t );
2219
2220    now = time( NULL );
2221    if( prevTime != now )
2222    {
2223        prevTime = now;
2224        newConnectionsThisSecond = 0;
2225    }
2226
2227    if( !t->isRunning )
2228    {
2229        removeAllPeers( t );
2230    }
2231    else
2232    {
2233        int                 i, nCandidates, nBad;
2234        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2235        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2236
2237        if( nBad || nCandidates )
2238            tordbg(
2239                t, "reconnect pulse for [%s]: %d bad connections, "
2240                   "%d connection candidates, %d atoms, max per pulse is %d",
2241                t->tor->info.name, nBad, nCandidates,
2242                tr_ptrArraySize( t->pool ),
2243                (int)MAX_RECONNECTIONS_PER_PULSE );
2244
2245        /* disconnect some peers.
2246           if we transferred piece data, then they might be good peers,
2247           so reset their `numFails' weight to zero.  otherwise we connected
2248           to them fruitlessly, so mark it as another fail */
2249        for( i = 0; i < nBad; ++i )
2250        {
2251            tr_peer *          peer = connections[i];
2252            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2253            if( peer->pieceDataActivityDate )
2254                atom->numFails = 0;
2255            else
2256                ++atom->numFails;
2257            tordbg( t, "removing bad peer %s",
2258                   tr_peerIoGetAddrStr( peer->io ) );
2259            removePeer( t, peer );
2260        }
2261
2262        /* add some new ones */
2263        for( i = 0;    ( i < nCandidates )
2264           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2265           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2266           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2267             ++i )
2268        {
2269            tr_peerMgr *       mgr = t->manager;
2270            struct peer_atom * atom = candidates[i];
2271            tr_peerIo *        io;
2272
2273            tordbg( t, "Starting an OUTGOING connection with %s",
2274                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2275
2276            io =
2277                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2278                                      t->hash );
2279            if( io == NULL )
2280            {
2281                atom->myflags |= MYFLAG_UNREACHABLE;
2282            }
2283            else
2284            {
2285                tr_handshake * handshake = tr_handshakeNew(
2286                    io,
2287                    mgr->session->
2288                    encryptionMode,
2289                    myHandshakeDoneCB,
2290                    mgr );
2291
2292                assert( tr_peerIoGetTorrentHash( io ) );
2293
2294                ++newConnectionsThisSecond;
2295
2296                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2297                                         handshakeCompare );
2298            }
2299
2300            atom->time = time( NULL );
2301        }
2302
2303        /* cleanup */
2304        tr_free( connections );
2305        tr_free( candidates );
2306    }
2307
2308    torrentUnlock( t );
2309    return TRUE;
2310}
2311
2312/****
2313*****
2314*****  BANDWIDTH ALLOCATION
2315*****
2316****/
2317
2318static double
2319allocateHowMuch( double         desiredAvgKB,
2320                 const double * history )
2321{
2322    const double baseline = desiredAvgKB * 1024.0 /
2323                            BANDWIDTH_PULSES_PER_SECOND;
2324    const double min = baseline * 0.66;
2325    const double max = baseline * 1.33;
2326    int          i;
2327    double       usedBytes;
2328    double       n;
2329    double       clamped;
2330
2331    for( usedBytes = i = 0; i < BANDWIDTH_PULSE_HISTORY; ++i )
2332        usedBytes += history[i];
2333
2334    n = ( desiredAvgKB * 1024.0 )
2335      * ( BANDWIDTH_PULSE_HISTORY + 1.0 )
2336      / BANDWIDTH_PULSES_PER_SECOND
2337      - usedBytes;
2338
2339    /* clamp the return value to lessen oscillation */
2340    clamped = n;
2341    clamped = MAX( clamped, min );
2342    clamped = MIN( clamped, max );
2343/*fprintf( stderr, "desiredAvgKB is %.2f, rate is %.2f, allocating %.2f
2344  (%.2f)\n", desiredAvgKB,
2345  ((usedBytes*BANDWIDTH_PULSES_PER_SECOND)/BANDWIDTH_PULSE_HISTORY)/1024.0,
2346  clamped/1024.0, n/1024.0 );*/
2347    return clamped;
2348}
2349
2350/**
2351 * Distributes a fixed amount of bandwidth among a set of peers.
2352 *
2353 * @param peerArray peers whose client-to-peer bandwidth will be set
2354 * @param direction whether to allocate upload or download bandwidth
2355 * @param history recent bandwidth history for these peers
2356 * @param desiredAvgKB overall bandwidth goal for this set of peers
2357 */
2358static void
2359setPeerBandwidth( tr_ptrArray *      peerArray,
2360                  const tr_direction direction,
2361                  const double *     history,
2362                  double             desiredAvgKB )
2363{
2364    const int    peerCount = tr_ptrArraySize( peerArray );
2365    const double bytes = allocateHowMuch( desiredAvgKB, history );
2366    const double welfareBytes = MIN( 2048, bytes * 0.2 );
2367    const double meritBytes = MAX( 0, bytes - welfareBytes );
2368    tr_peer **   peers = (tr_peer**) tr_ptrArrayBase( peerArray );
2369    tr_peer **   candidates = tr_new( tr_peer *, peerCount );
2370    int          i;
2371    int          candidateCount;
2372    double       welfare;
2373    size_t       bytesUsed;
2374
2375    assert( meritBytes >= 0.0 );
2376    assert( welfareBytes >= 0.0 );
2377    assert( direction == TR_UP || direction == TR_DOWN );
2378
2379    for( i = candidateCount = 0; i < peerCount; ++i )
2380        if( tr_peerIoWantsBandwidth( peers[i]->io, direction ) )
2381            candidates[candidateCount++] = peers[i];
2382        else
2383            tr_peerIoSetBandwidth( peers[i]->io, direction, 0 );
2384
2385    for( i = bytesUsed = 0; i < candidateCount; ++i )
2386        bytesUsed += tr_peerIoGetBandwidthUsed( candidates[i]->io,
2387                                                direction );
2388
2389    welfare = welfareBytes / candidateCount;
2390
2391    for( i = 0; i < candidateCount; ++i )
2392    {
2393        tr_peer *    peer = candidates[i];
2394        const double merit = bytesUsed
2395                             ? ( meritBytes *
2396                                tr_peerIoGetBandwidthUsed( peer->io,
2397                                                           direction ) ) /
2398                             bytesUsed
2399                             : ( meritBytes / candidateCount );
2400        tr_peerIoSetBandwidth( peer->io, direction, merit + welfare );
2401    }
2402
2403    /* cleanup */
2404    tr_free( candidates );
2405}
2406
2407static size_t
2408countHandshakeBandwidth( tr_ptrArray * handshakes,
2409                         tr_direction  direction )
2410{
2411    const int n = tr_ptrArraySize( handshakes );
2412    int       i;
2413    size_t    total;
2414
2415    for( i = total = 0; i < n; ++i )
2416    {
2417        tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) );
2418        total += tr_peerIoGetBandwidthUsed( io, direction );
2419    }
2420    return total;
2421}
2422
2423static size_t
2424countPeerBandwidth( tr_ptrArray * peers,
2425                    tr_direction  direction )
2426{
2427    const int n = tr_ptrArraySize( peers );
2428    int       i;
2429    size_t    total;
2430
2431    for( i = total = 0; i < n; ++i )
2432    {
2433        tr_peer * peer = tr_ptrArrayNth( peers, i );
2434        total += tr_peerIoGetBandwidthUsed( peer->io, direction );
2435    }
2436    return total;
2437}
2438
2439static void
2440givePeersUnlimitedBandwidth( tr_ptrArray * peers,
2441                             tr_direction  direction )
2442{
2443    const int n = tr_ptrArraySize( peers );
2444    int       i;
2445
2446    for( i = 0; i < n; ++i )
2447    {
2448        tr_peer * peer = tr_ptrArrayNth( peers, i );
2449        tr_peerIoSetBandwidthUnlimited( peer->io, direction );
2450    }
2451}
2452
2453static void
2454pumpAllPeers( tr_peerMgr * mgr )
2455{
2456    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2457    int       i, j;
2458
2459    for( i = 0; i < torrentCount; ++i )
2460    {
2461        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2462        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2463        {
2464            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2465            tr_peerMsgsPulse( peer->msgs );
2466        }
2467    }
2468}
2469
2470/**
2471 * Allocate bandwidth for each peer connection.
2472 *
2473 * @param mgr the peer manager
2474 * @param direction whether to allocate upload or download bandwidth
2475 * @return the amount of directional bandwidth used since the last pulse.
2476 */
2477static double
2478allocateBandwidth( tr_peerMgr * mgr,
2479                   tr_direction direction )
2480{
2481    tr_session *  session = mgr->session;
2482    const int     pulseNumber = mgr->bandwidthPulseNumber;
2483    const int     torrentCount = tr_ptrArraySize( mgr->torrents );
2484    Torrent **    torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2485    tr_ptrArray * globalPool = tr_ptrArrayNew( );
2486    double        allBytesUsed = 0;
2487    size_t        poolBytesUsed = 0;
2488    int           i;
2489
2490    assert( mgr );
2491    assert( direction == TR_UP || direction == TR_DOWN );
2492
2493    /* before allocating bandwidth, pump the connected peers */
2494    pumpAllPeers( mgr );
2495
2496    for( i = 0; i < torrentCount; ++i )
2497    {
2498        Torrent *    t = torrents[i];
2499        const size_t used = countPeerBandwidth( t->peers, direction );
2500        countHandshakeBandwidth( t->outgoingHandshakes, direction );
2501
2502        /* remember this torrent's bytes used */
2503        t->tor->rateHistory[direction][pulseNumber] = used;
2504
2505        /* add this torrent's bandwidth use to allBytesUsed */
2506        allBytesUsed += used;
2507
2508        /* process the torrent's peers based on its speed mode */
2509        switch( tr_torrentGetSpeedMode( t->tor, direction ) )
2510        {
2511            case TR_SPEEDLIMIT_UNLIMITED:
2512                givePeersUnlimitedBandwidth( t->peers, direction );
2513                break;
2514
2515            case TR_SPEEDLIMIT_SINGLE:
2516                setPeerBandwidth( t->peers, direction,
2517                                  t->tor->rateHistory[direction],
2518                                  tr_torrentGetSpeedLimit( t->tor,
2519                                                           direction ) );
2520                break;
2521
2522            case TR_SPEEDLIMIT_GLOBAL:
2523            {
2524                int       i;
2525                const int n = tr_ptrArraySize( t->peers );
2526                for( i = 0; i < n; ++i )
2527                    tr_ptrArrayAppend( globalPool,
2528                                      tr_ptrArrayNth( t->peers, i ) );
2529                poolBytesUsed += used;
2530                break;
2531            }
2532        }
2533    }
2534
2535    /* add incoming handshakes to the global pool */
2536    i = countHandshakeBandwidth( mgr->incomingHandshakes, direction );
2537    allBytesUsed += i;
2538    poolBytesUsed += i;
2539
2540    mgr->globalPoolHistory[direction][pulseNumber] = poolBytesUsed;
2541
2542    /* handle the global pool's connections */
2543    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2544        givePeersUnlimitedBandwidth( globalPool, direction );
2545    else
2546        setPeerBandwidth( globalPool, direction,
2547                         mgr->globalPoolHistory[direction],
2548                         tr_sessionGetSpeedLimit( session, direction ) );
2549
2550    /* now that we've allocated bandwidth, pump all the connected peers */
2551    pumpAllPeers( mgr );
2552
2553    /* cleanup */
2554    tr_ptrArrayFree( globalPool, NULL );
2555    return allBytesUsed;
2556}
2557
2558static int
2559bandwidthPulse( void * vmgr )
2560{
2561    tr_peerMgr * mgr = vmgr;
2562    int          i;
2563
2564    managerLock( mgr );
2565
2566    /* keep track of how far we are into the cycle */
2567    if( ++mgr->bandwidthPulseNumber == BANDWIDTH_PULSE_HISTORY )
2568        mgr->bandwidthPulseNumber = 0;
2569
2570    /* allocate the upload and download bandwidth */
2571    for( i = 0; i < 2; ++i )
2572        mgr->rateHistory[i][mgr->bandwidthPulseNumber] =
2573            allocateBandwidth( mgr, i );
2574
2575    managerUnlock( mgr );
2576    return TRUE;
2577}
2578
Note: See TracBrowser for help on using the repository browser.