source: trunk/libtransmission/peer-mgr.c @ 7148

Last change on this file since 7148 was 7148, checked in by charles, 12 years ago

oops, turn off a debugging message

  • Property svn:keywords set to Date Rev Author Id
File size: 71.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7148 2008-11-24 04:35:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "ratecontrol.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 333,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 4,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 8,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93/* We keep one of these for every peer we know about, whether
94 * it's connected or not, so the struct must be small.
95 * When our current connections underperform, we dip back
96 * into this list for new ones. */
97struct peer_atom
98{
99    uint8_t           from;
100    uint8_t           flags; /* these match the added_f flags */
101    uint8_t           myflags; /* flags that aren't defined in added_f */
102    uint16_t          port;
103    uint16_t          numFails;
104    struct in_addr    addr;
105    time_t            time; /* when the peer's connection status last changed */
106    time_t            piece_data_time;
107};
108
109typedef struct
110{
111    unsigned int    isRunning : 1;
112
113    uint8_t         hash[SHA_DIGEST_LENGTH];
114    int         *   pendingRequestCount;
115    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
116    tr_ptrArray *   pool; /* struct peer_atom */
117    tr_ptrArray *   peers; /* tr_peer */
118    tr_ptrArray *   webseeds; /* tr_webseed */
119    tr_timer *      reconnectTimer;
120    tr_timer *      rechokeTimer;
121    tr_timer *      refillTimer;
122    tr_torrent *    tor;
123    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
124
125    struct tr_peerMgr * manager;
126}
127Torrent;
128
129struct tr_peerMgr
130{
131    tr_session      * session;
132    tr_ptrArray     * torrents; /* Torrent */
133    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
134    tr_timer        * bandwidthTimer;
135};
136
137#define tordbg( t, ... ) \
138    do { \
139        if( tr_deepLoggingIsActive( ) ) \
140            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
141    } while( 0 )
142
143#define dbgmsg( ... ) \
144    do { \
145        if( tr_deepLoggingIsActive( ) ) \
146            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
147    } while( 0 )
148
149/**
150***
151**/
152
153static void
154managerLock( const struct tr_peerMgr * manager )
155{
156    tr_globalLock( manager->session );
157}
158
159static void
160managerUnlock( const struct tr_peerMgr * manager )
161{
162    tr_globalUnlock( manager->session );
163}
164
165static void
166torrentLock( Torrent * torrent )
167{
168    managerLock( torrent->manager );
169}
170
171static void
172torrentUnlock( Torrent * torrent )
173{
174    managerUnlock( torrent->manager );
175}
176
177static int
178torrentIsLocked( const Torrent * t )
179{
180    return tr_globalIsLocked( t->manager->session );
181}
182
183/**
184***
185**/
186
187static int
188compareAddresses( const struct in_addr * a,
189                  const struct in_addr * b )
190{
191    if( a->s_addr != b->s_addr )
192        return a->s_addr < b->s_addr ? -1 : 1;
193
194    return 0;
195}
196
197static int
198handshakeCompareToAddr( const void * va,
199                        const void * vb )
200{
201    const tr_handshake * a = va;
202
203    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
204}
205
206static int
207handshakeCompare( const void * a,
208                  const void * b )
209{
210    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
211}
212
213static tr_handshake*
214getExistingHandshake( tr_ptrArray *          handshakes,
215                      const struct in_addr * in_addr )
216{
217    return tr_ptrArrayFindSorted( handshakes,
218                                  in_addr,
219                                  handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va,
224                          const void * vb )
225{
226    const struct peer_atom * a = va;
227
228    return compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va,
233                  const void * vb )
234{
235    const struct peer_atom * b = vb;
236
237    return comparePeerAtomToAddress( va, &b->addr );
238}
239
240/**
241***
242**/
243
244static int
245torrentCompare( const void * va,
246                const void * vb )
247{
248    const Torrent * a = va;
249    const Torrent * b = vb;
250
251    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
252}
253
254static int
255torrentCompareToHash( const void * va,
256                      const void * vb )
257{
258    const Torrent * a = va;
259    const uint8_t * b_hash = vb;
260
261    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
262}
263
264static Torrent*
265getExistingTorrent( tr_peerMgr *    manager,
266                    const uint8_t * hash )
267{
268    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
269                                             hash,
270                                             torrentCompareToHash );
271}
272
273static int
274peerCompare( const void * va,
275             const void * vb )
276{
277    const tr_peer * a = va;
278    const tr_peer * b = vb;
279
280    return compareAddresses( &a->in_addr, &b->in_addr );
281}
282
283static int
284peerCompareToAddr( const void * va,
285                   const void * vb )
286{
287    const tr_peer * a = va;
288
289    return compareAddresses( &a->in_addr, vb );
290}
291
292static tr_peer*
293getExistingPeer( Torrent *              torrent,
294                 const struct in_addr * in_addr )
295{
296    assert( torrentIsLocked( torrent ) );
297    assert( in_addr );
298
299    return tr_ptrArrayFindSorted( torrent->peers,
300                                  in_addr,
301                                  peerCompareToAddr );
302}
303
304static struct peer_atom*
305getExistingAtom( const                  Torrent * t,
306                 const struct in_addr * addr )
307{
308    assert( torrentIsLocked( t ) );
309    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
310}
311
312static int
313peerIsInUse( const Torrent *        ct,
314             const struct in_addr * addr )
315{
316    Torrent * t = (Torrent*) ct;
317
318    assert( torrentIsLocked ( t ) );
319
320    return getExistingPeer( t, addr )
321           || getExistingHandshake( t->outgoingHandshakes, addr )
322           || getExistingHandshake( t->manager->incomingHandshakes, addr );
323}
324
325static tr_peer*
326peerConstructor( const struct in_addr * in_addr )
327{
328    tr_peer * p;
329
330    p = tr_new0( tr_peer, 1 );
331    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
332    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
333    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
334    p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
335    p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
336    return p;
337}
338
339static tr_peer*
340getPeer( Torrent *              torrent,
341         const struct in_addr * in_addr )
342{
343    tr_peer * peer;
344
345    assert( torrentIsLocked( torrent ) );
346
347    peer = getExistingPeer( torrent, in_addr );
348
349    if( peer == NULL )
350    {
351        peer = peerConstructor( in_addr );
352        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
353    }
354
355    return peer;
356}
357
358static void
359peerDestructor( tr_peer * peer )
360{
361    assert( peer );
362    assert( peer->msgs );
363
364    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
365    tr_peerMsgsFree( peer->msgs );
366
367    tr_peerIoFree( peer->io );
368
369    tr_bitfieldFree( peer->have );
370    tr_bitfieldFree( peer->blame );
371    tr_free( peer->client );
372
373    tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
374    tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
375    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
376    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
377    tr_free( peer );
378}
379
380static void
381removePeer( Torrent * t,
382            tr_peer * peer )
383{
384    tr_peer *          removed;
385    struct peer_atom * atom;
386
387    assert( torrentIsLocked( t ) );
388
389    atom = getExistingAtom( t, &peer->in_addr );
390    assert( atom );
391    atom->time = time( NULL );
392
393    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
394    assert( removed == peer );
395    peerDestructor( removed );
396}
397
398static void
399removeAllPeers( Torrent * t )
400{
401    while( !tr_ptrArrayEmpty( t->peers ) )
402        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
403}
404
405static void
406torrentDestructor( void * vt )
407{
408    Torrent * t = vt;
409    uint8_t   hash[SHA_DIGEST_LENGTH];
410
411    assert( t );
412    assert( !t->isRunning );
413    assert( t->peers );
414    assert( torrentIsLocked( t ) );
415    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
416    assert( tr_ptrArrayEmpty( t->peers ) );
417
418    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
419
420    tr_timerFree( &t->reconnectTimer );
421    tr_timerFree( &t->rechokeTimer );
422    tr_timerFree( &t->refillTimer );
423
424    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
425    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
426    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
427    tr_ptrArrayFree( t->peers, NULL );
428
429    tr_free( t->pendingRequestCount );
430    tr_free( t );
431}
432
433static void peerCallbackFunc( void * vpeer,
434                              void * vevent,
435                              void * vt );
436
437static Torrent*
438torrentConstructor( tr_peerMgr * manager,
439                    tr_torrent * tor )
440{
441    int       i;
442    Torrent * t;
443
444    t = tr_new0( Torrent, 1 );
445    t->manager = manager;
446    t->tor = tor;
447    t->pool = tr_ptrArrayNew( );
448    t->peers = tr_ptrArrayNew( );
449    t->webseeds = tr_ptrArrayNew( );
450    t->outgoingHandshakes = tr_ptrArrayNew( );
451    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
452
453    for( i = 0; i < tor->info.webseedCount; ++i )
454    {
455        tr_webseed * w =
456            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
457                           t );
458        tr_ptrArrayAppend( t->webseeds, w );
459    }
460
461    return t;
462}
463
464/**
465 * For explanation, see http://www.bittorrent.org/fast_extensions.html
466 * Also see the "test-allowed-set" unit test
467 *
468 * @param k number of pieces in set
469 * @param sz number of pieces in the torrent
470 * @param infohash torrent's SHA1 hash
471 * @param ip peer's address
472 */
473struct tr_bitfield *
474tr_peerMgrGenerateAllowedSet(
475    const uint32_t         k,
476    const uint32_t         sz,
477    const                  uint8_t        *
478                           infohash,
479    const struct in_addr * ip )
480{
481    uint8_t       w[SHA_DIGEST_LENGTH + 4];
482    uint8_t       x[SHA_DIGEST_LENGTH];
483    tr_bitfield * a;
484    uint32_t      a_size;
485
486    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
487    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
488    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
489
490    a = tr_bitfieldNew( sz );
491    a_size = 0;
492
493    while( a_size < k )
494    {
495        int i;
496        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
497        {
498            uint32_t j = i * 4;                                /* (5) */
499            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
500            uint32_t index = y % sz;                           /* (7) */
501            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
502            {
503                tr_bitfieldAdd( a, index );                    /* (9) */
504                ++a_size;
505            }
506        }
507        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
508    }
509
510    return a;
511}
512
513static int bandwidthPulse( void * vmgr );
514
515
516tr_peerMgr*
517tr_peerMgrNew( tr_session * session )
518{
519    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
520
521    m->session = session;
522    m->torrents = tr_ptrArrayNew( );
523    m->incomingHandshakes = tr_ptrArrayNew( );
524    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
525    return m;
526}
527
528void
529tr_peerMgrFree( tr_peerMgr * manager )
530{
531    managerLock( manager );
532
533    tr_timerFree( &manager->bandwidthTimer );
534
535    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
536     * the item from manager->handshakes, so this is a little roundabout... */
537    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
538        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
539
540    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
541
542    /* free the torrents. */
543    tr_ptrArrayFree( manager->torrents, torrentDestructor );
544
545    managerUnlock( manager );
546    tr_free( manager );
547}
548
549static tr_peer**
550getConnectedPeers( Torrent * t,
551                   int *     setmeCount )
552{
553    int       i, peerCount, connectionCount;
554    tr_peer **peers;
555    tr_peer **ret;
556
557    assert( torrentIsLocked( t ) );
558
559    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
560    ret = tr_new( tr_peer *, peerCount );
561
562    for( i = connectionCount = 0; i < peerCount; ++i )
563        if( peers[i]->msgs )
564            ret[connectionCount++] = peers[i];
565
566    *setmeCount = connectionCount;
567    return ret;
568}
569
570static int
571clientIsDownloadingFrom( const tr_peer * peer )
572{
573    return peer->clientIsInterested && !peer->clientIsChoked;
574}
575
576static int
577clientIsUploadingTo( const tr_peer * peer )
578{
579    return peer->peerIsInterested && !peer->peerIsChoked;
580}
581
582/***
583****
584***/
585
586int
587tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
588                      const uint8_t *        torrentHash,
589                      const struct in_addr * addr )
590{
591    int                      isSeed = FALSE;
592    const Torrent *          t = NULL;
593    const struct peer_atom * atom = NULL;
594
595    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
596    if( t )
597        atom = getExistingAtom( t, addr );
598    if( atom )
599        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
600
601    return isSeed;
602}
603
604/****
605*****
606*****  REFILL
607*****
608****/
609
610static void
611assertValidPiece( Torrent * t, tr_piece_index_t piece )
612{
613    assert( t );
614    assert( t->tor );
615    assert( piece < t->tor->info.pieceCount );
616}
617
618static int
619getPieceRequests( Torrent * t, tr_piece_index_t piece )
620{
621    assertValidPiece( t, piece );
622
623    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
624}
625
626static void
627incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
628{
629    assertValidPiece( t, piece );
630
631    if( t->pendingRequestCount == NULL )
632        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
633    t->pendingRequestCount[piece]++;
634}
635
636static void
637decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
638{
639    assertValidPiece( t, piece );
640
641    if( t->pendingRequestCount )
642        t->pendingRequestCount[piece]--;
643}
644
645struct tr_refill_piece
646{
647    tr_priority_t    priority;
648    uint32_t         piece;
649    uint32_t         peerCount;
650    int              random;
651    int              pendingRequestCount;
652    int              missingBlockCount;
653};
654
655static int
656compareRefillPiece( const void * aIn,
657                    const void * bIn )
658{
659    const struct tr_refill_piece * a = aIn;
660    const struct tr_refill_piece * b = bIn;
661
662    /* if one piece has a higher priority, it goes first */
663    if( a->priority != b->priority )
664        return a->priority > b->priority ? -1 : 1;
665
666    /* have a per-priority endgame */
667    if( a->pendingRequestCount != b->pendingRequestCount )
668        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
669
670    /* fewer missing pieces goes first */
671    if( a->missingBlockCount != b->missingBlockCount )
672        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
673
674    /* otherwise if one has fewer peers, it goes first */
675    if( a->peerCount != b->peerCount )
676        return a->peerCount < b->peerCount ? -1 : 1;
677
678    /* otherwise go with our random seed */
679    if( a->random != b->random )
680        return a->random < b->random ? -1 : 1;
681
682    return 0;
683}
684
685static tr_piece_index_t *
686getPreferredPieces( Torrent           * t,
687                    tr_piece_index_t  * pieceCount )
688{
689    const tr_torrent  * tor = t->tor;
690    const tr_info     * inf = &tor->info;
691    tr_piece_index_t    i;
692    tr_piece_index_t    poolSize = 0;
693    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
694    int                 peerCount;
695    tr_peer**           peers;
696
697    assert( torrentIsLocked( t ) );
698
699    peers = getConnectedPeers( t, &peerCount );
700
701    /* make a list of the pieces that we want but don't have */
702    for( i = 0; i < inf->pieceCount; ++i )
703        if( !tor->info.pieces[i].dnd
704                && !tr_cpPieceIsComplete( tor->completion, i ) )
705            pool[poolSize++] = i;
706
707    /* sort the pool by which to request next */
708    if( poolSize > 1 )
709    {
710        tr_piece_index_t j;
711        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
712
713        for( j = 0; j < poolSize; ++j )
714        {
715            int k;
716            const tr_piece_index_t piece = pool[j];
717            struct tr_refill_piece * setme = p + j;
718
719            setme->piece = piece;
720            setme->priority = inf->pieces[piece].priority;
721            setme->peerCount = 0;
722            setme->random = tr_cryptoWeakRandInt( INT_MAX );
723            setme->pendingRequestCount = getPieceRequests( t, piece );
724            setme->missingBlockCount
725                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
726
727            for( k = 0; k < peerCount; ++k )
728            {
729                const tr_peer * peer = peers[k];
730                if( peer->peerIsInterested
731                        && !peer->clientIsChoked
732                        && tr_bitfieldHas( peer->have, piece ) )
733                    ++setme->peerCount;
734            }
735        }
736
737        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
738               compareRefillPiece );
739
740        for( j = 0; j < poolSize; ++j )
741            pool[j] = p[j].piece;
742
743        tr_free( p );
744    }
745
746    tr_free( peers );
747
748    *pieceCount = poolSize;
749    return pool;
750}
751
752struct tr_blockIterator
753{
754    Torrent * t;
755    tr_block_index_t blockIndex, blockCount, *blocks;
756    tr_piece_index_t pieceIndex, pieceCount, *pieces;
757};
758
759static struct tr_blockIterator*
760blockIteratorNew( Torrent * t )
761{
762    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
763    i->t = t;
764    i->pieces = getPreferredPieces( t, &i->pieceCount );
765    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
766    return i;
767}
768
769static int
770blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
771{
772    int found;
773    Torrent * t = i->t;
774    tr_torrent * tor = t->tor;
775
776    while( ( i->blockIndex == i->blockCount )
777        && ( i->pieceIndex < i->pieceCount ) )
778    {
779        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
780        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
781        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
782        tr_block_index_t block;
783
784        assert( index < tor->info.pieceCount );
785
786        i->blockCount = 0;
787        i->blockIndex = 0;
788        for( block=b; block!=e; ++block )
789            if( !tr_cpBlockIsComplete( tor->completion, block ) )
790                i->blocks[i->blockCount++] = block;
791    }
792
793    if(( found = ( i->blockIndex < i->blockCount )))
794        *setme = i->blocks[i->blockIndex++];
795
796    return found;
797}
798
799static void
800blockIteratorFree( struct tr_blockIterator * i )
801{
802    tr_free( i->blocks );
803    tr_free( i->pieces );
804    tr_free( i );
805}
806
807static tr_peer**
808getPeersUploadingToClient( Torrent * t,
809                           int *     setmeCount )
810{
811    int j;
812    int peerCount = 0;
813    int retCount = 0;
814    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
815    tr_peer ** ret = tr_new( tr_peer *, peerCount );
816
817    j = 0; /* this is a temporary test to make sure we walk through all the peers */
818    if( peerCount )
819    {
820        /* Get a list of peers we're downloading from.
821           Pick a different starting point each time so all peers
822           get a chance at being the first in line */
823        const int fencepost = tr_cryptoWeakRandInt( peerCount );
824        int i = fencepost;
825        do {
826            if( clientIsDownloadingFrom( peers[i] ) )
827                ret[retCount++] = peers[i];
828            i = ( i + 1 ) % peerCount;
829            ++j;
830        } while( i != fencepost );
831    }
832    assert( j == peerCount );
833    *setmeCount = retCount;
834    return ret;
835}
836
837static uint32_t
838getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
839{
840    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
841    const uint64_t blockPos = tor->blockSize * b;
842    assert( blockPos >= piecePos );
843    return (uint32_t)( blockPos - piecePos );
844}
845
846static int
847refillPulse( void * vtorrent )
848{
849    tr_block_index_t block;
850    int peerCount;
851    int webseedCount;
852    tr_peer ** peers;
853    tr_webseed ** webseeds;
854    struct tr_blockIterator * blockIterator;
855    Torrent * t = vtorrent;
856    tr_torrent * tor = t->tor;
857
858    if( !t->isRunning )
859        return TRUE;
860    if( tr_torrentIsSeed( t->tor ) )
861        return TRUE;
862
863    torrentLock( t );
864    tordbg( t, "Refilling Request Buffers..." );
865
866    blockIterator = blockIteratorNew( t );
867    peers = getPeersUploadingToClient( t, &peerCount );
868    webseedCount = tr_ptrArraySize( t->webseeds );
869    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
870                          webseedCount * sizeof( tr_webseed* ) );
871
872    while( ( webseedCount || peerCount )
873        && blockIteratorNext( blockIterator, &block ) )
874    {
875        int j;
876        int handled = FALSE;
877
878        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
879        const uint32_t offset = getBlockOffsetInPiece( tor, block );
880        const uint32_t length = tr_torBlockCountBytes( tor, block );
881
882        /* find a peer who can ask for this block */
883        for( j=0; !handled && j<peerCount; )
884        {
885            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
886                                                   index, offset, length );
887            switch( val )
888            {
889                case TR_ADDREQ_FULL:
890                case TR_ADDREQ_CLIENT_CHOKED:
891                    peers[j] = peers[--peerCount];
892                    break;
893
894                case TR_ADDREQ_MISSING:
895                case TR_ADDREQ_DUPLICATE:
896                    ++j;
897                    break;
898
899                case TR_ADDREQ_OK:
900                    incrementPieceRequests( t, index );
901                    handled = TRUE;
902                    break;
903
904                default:
905                    assert( 0 && "unhandled value" );
906                    break;
907            }
908        }
909
910        /* maybe one of the webseeds can do it */
911        for( j=0; !handled && j<webseedCount; )
912        {
913            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
914                                                          index, offset, length );
915            switch( val )
916            {
917                case TR_ADDREQ_FULL:
918                    webseeds[j] = webseeds[--webseedCount];
919                    break;
920
921                case TR_ADDREQ_OK:
922                    incrementPieceRequests( t, index );
923                    handled = TRUE;
924                    break;
925
926                default:
927                    assert( 0 && "unhandled value" );
928                    break;
929            }
930        }
931    }
932
933    /* cleanup */
934    blockIteratorFree( blockIterator );
935    tr_free( webseeds );
936    tr_free( peers );
937
938    t->refillTimer = NULL;
939    torrentUnlock( t );
940    return FALSE;
941}
942
943static void
944broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
945{
946    int i, size;
947    tr_peer ** peers;
948
949    assert( torrentIsLocked( t ) );
950
951    peers = getConnectedPeers( t, &size );
952    for( i=0; i<size; ++i )
953        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
954    tr_free( peers );
955}
956
957static void
958addStrike( Torrent * t,
959           tr_peer * peer )
960{
961    tordbg( t, "increasing peer %s strike count to %d",
962            tr_peerIoAddrStr( &peer->in_addr,
963                              peer->port ), peer->strikes + 1 );
964
965    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
966    {
967        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
968        atom->myflags |= MYFLAG_BANNED;
969        peer->doPurge = 1;
970        tordbg( t, "banning peer %s",
971               tr_peerIoAddrStr( &atom->addr, atom->port ) );
972    }
973}
974
975static void
976gotBadPiece( Torrent *        t,
977             tr_piece_index_t pieceIndex )
978{
979    tr_torrent *   tor = t->tor;
980    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
981
982    tor->corruptCur += byteCount;
983    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
984}
985
986static void
987refillSoon( Torrent * t )
988{
989    if( t->refillTimer == NULL )
990        t->refillTimer = tr_timerNew( t->manager->session,
991                                      refillPulse, t,
992                                      REFILL_PERIOD_MSEC );
993}
994
995static void
996peerCallbackFunc( void * vpeer,
997                  void * vevent,
998                  void * vt )
999{
1000    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
1001    Torrent *             t = (Torrent *) vt;
1002    const tr_peer_event * e = vevent;
1003
1004    torrentLock( t );
1005
1006    switch( e->eventType )
1007    {
1008        case TR_PEER_NEED_REQ:
1009            refillSoon( t );
1010            break;
1011
1012        case TR_PEER_CANCEL:
1013            decrementPieceRequests( t, e->pieceIndex );
1014            break;
1015
1016        case TR_PEER_PEER_GOT_DATA:
1017        {
1018            const time_t now = time( NULL );
1019            tr_torrent * tor = t->tor;
1020            const tr_direction dir = TR_CLIENT_TO_PEER;
1021
1022            tor->activityDate = now;
1023
1024            if( e->wasPieceData )
1025                tor->uploadedCur += e->length;
1026
1027            /* add it to the raw upload speed */
1028            if( peer )
1029                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1030
1031            /* maybe add it to the piece upload speed */
1032            if( e->wasPieceData ) {
1033                if( peer )
1034                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1035            }
1036
1037            tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
1038            tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
1039
1040            /* update the stats */
1041            if( e->wasPieceData )
1042                tr_statsAddUploaded( tor->session, e->length );
1043
1044            /* update our atom */
1045            if( peer ) {
1046                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1047                a->piece_data_time = now;
1048            }
1049
1050            break;
1051        }
1052
1053        case TR_PEER_CLIENT_GOT_DATA:
1054        {
1055            const time_t now = time( NULL );
1056            tr_torrent * tor = t->tor;
1057            const tr_direction dir = TR_PEER_TO_CLIENT;
1058
1059            tor->activityDate = now;
1060
1061            /* only add this to downloadedCur if we got it from a peer --
1062             * webseeds shouldn't count against our ratio.  As one tracker
1063             * admin put it, "Those pieces are downloaded directly from the
1064             * content distributor, not the peers, it is the tracker's job
1065             * to manage the swarms, not the web server and does not fit
1066             * into the jurisdiction of the tracker." */
1067            if( peer && e->wasPieceData )
1068                tor->downloadedCur += e->length;
1069
1070            /* add it to our raw download speed */
1071            if( peer )
1072                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
1073
1074            /* maybe add it to the piece upload speed */
1075            if( e->wasPieceData ) {
1076                if( peer )
1077                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
1078            }
1079
1080            tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
1081            tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
1082     
1083            /* update the stats */ 
1084            if( e->wasPieceData )
1085                tr_statsAddDownloaded( tor->session, e->length );
1086
1087            /* update our atom */
1088            if( peer ) {
1089                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1090                a->piece_data_time = now;
1091            }
1092
1093            break;
1094        }
1095
1096        case TR_PEER_PEER_PROGRESS:
1097        {
1098            if( peer )
1099            {
1100                struct peer_atom * atom = getExistingAtom( t,
1101                                                           &peer->in_addr );
1102                const int          peerIsSeed = e->progress >= 1.0;
1103                if( peerIsSeed )
1104                {
1105                    tordbg( t, "marking peer %s as a seed",
1106                           tr_peerIoAddrStr( &atom->addr,
1107                                             atom->port ) );
1108                    atom->flags |= ADDED_F_SEED_FLAG;
1109                }
1110                else
1111                {
1112                    tordbg( t, "marking peer %s as a non-seed",
1113                           tr_peerIoAddrStr( &atom->addr,
1114                                             atom->port ) );
1115                    atom->flags &= ~ADDED_F_SEED_FLAG;
1116                }
1117            }
1118            break;
1119        }
1120
1121        case TR_PEER_CLIENT_GOT_BLOCK:
1122        {
1123            tr_torrent *     tor = t->tor;
1124
1125            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1126                                                e->offset );
1127
1128            tr_cpBlockAdd( tor->completion, block );
1129            decrementPieceRequests( t, e->pieceIndex );
1130
1131            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1132
1133            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1134            {
1135                const tr_piece_index_t p = e->pieceIndex;
1136                const int              ok = tr_ioTestPiece( tor, p );
1137
1138                if( !ok )
1139                {
1140                    tr_torerr( tor,
1141                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1142                              (unsigned long)p );
1143                }
1144
1145                tr_torrentSetHasPiece( tor, p, ok );
1146                tr_torrentSetPieceChecked( tor, p, TRUE );
1147                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1148
1149                if( !ok )
1150                    gotBadPiece( t, p );
1151                else
1152                {
1153                    int        i, peerCount;
1154                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1155                    for( i = 0; i < peerCount; ++i )
1156                        tr_peerMsgsHave( peers[i]->msgs, p );
1157                    tr_free( peers );
1158                }
1159
1160                tr_torrentRecheckCompleteness( tor );
1161            }
1162            break;
1163        }
1164
1165        case TR_PEER_ERROR:
1166            if( e->err == EINVAL )
1167            {
1168                addStrike( t, peer );
1169                peer->doPurge = 1;
1170            }
1171            else if( ( e->err == ERANGE )
1172                  || ( e->err == EMSGSIZE )
1173                  || ( e->err == ENOTCONN ) )
1174            {
1175                /* some protocol error from the peer */
1176                peer->doPurge = 1;
1177            }
1178            else /* a local error, such as an IO error */
1179            {
1180                t->tor->error = e->err;
1181                tr_strlcpy( t->tor->errorString,
1182                            tr_strerror( t->tor->error ),
1183                            sizeof( t->tor->errorString ) );
1184                tr_torrentStop( t->tor );
1185            }
1186            break;
1187
1188        default:
1189            assert( 0 );
1190    }
1191
1192    torrentUnlock( t );
1193}
1194
1195static void
1196ensureAtomExists( Torrent *              t,
1197                  const struct in_addr * addr,
1198                  uint16_t               port,
1199                  uint8_t                flags,
1200                  uint8_t                from )
1201{
1202    if( getExistingAtom( t, addr ) == NULL )
1203    {
1204        struct peer_atom * a;
1205        a = tr_new0( struct peer_atom, 1 );
1206        a->addr = *addr;
1207        a->port = port;
1208        a->flags = flags;
1209        a->from = from;
1210        tordbg( t, "got a new atom: %s",
1211               tr_peerIoAddrStr( &a->addr, a->port ) );
1212        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1213    }
1214}
1215
1216static int
1217getMaxPeerCount( const tr_torrent * tor )
1218{
1219    return tor->maxConnectedPeers;
1220}
1221
1222static int
1223getPeerCount( const Torrent * t )
1224{
1225    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1226               t->outgoingHandshakes );
1227}
1228
1229/* FIXME: this is kind of a mess. */
1230static int
1231myHandshakeDoneCB( tr_handshake *  handshake,
1232                   tr_peerIo *     io,
1233                   int             isConnected,
1234                   const uint8_t * peer_id,
1235                   void *          vmanager )
1236{
1237    int                    ok = isConnected;
1238    int                    success = FALSE;
1239    uint16_t               port;
1240    const struct in_addr * addr;
1241    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1242    Torrent *              t;
1243    tr_handshake *         ours;
1244
1245    assert( io );
1246    assert( isConnected == 0 || isConnected == 1 );
1247
1248    t = tr_peerIoHasTorrentHash( io )
1249        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1250        : NULL;
1251
1252    if( tr_peerIoIsIncoming ( io ) )
1253        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1254                                        handshake, handshakeCompare );
1255    else if( t )
1256        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1257                                        handshake, handshakeCompare );
1258    else
1259        ours = handshake;
1260
1261    assert( ours );
1262    assert( ours == handshake );
1263
1264    if( t )
1265        torrentLock( t );
1266
1267    addr = tr_peerIoGetAddress( io, &port );
1268
1269    if( !ok || !t || !t->isRunning )
1270    {
1271        if( t )
1272        {
1273            struct peer_atom * atom = getExistingAtom( t, addr );
1274            if( atom )
1275                ++atom->numFails;
1276        }
1277
1278        tr_peerIoFree( io );
1279    }
1280    else /* looking good */
1281    {
1282        struct peer_atom * atom;
1283        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1284        atom = getExistingAtom( t, addr );
1285        atom->time = time( NULL );
1286
1287        if( atom->myflags & MYFLAG_BANNED )
1288        {
1289            tordbg( t, "banned peer %s tried to reconnect",
1290                   tr_peerIoAddrStr( &atom->addr,
1291                                     atom->port ) );
1292            tr_peerIoFree( io );
1293        }
1294        else if( tr_peerIoIsIncoming( io )
1295               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1296
1297        {
1298            tr_peerIoFree( io );
1299        }
1300        else
1301        {
1302            tr_peer * peer = getExistingPeer( t, addr );
1303
1304            if( peer ) /* we already have this peer */
1305            {
1306                tr_peerIoFree( io );
1307            }
1308            else
1309            {
1310                peer = getPeer( t, addr );
1311                tr_free( peer->client );
1312
1313                if( !peer_id )
1314                    peer->client = NULL;
1315                else
1316                {
1317                    char client[128];
1318                    tr_clientForId( client, sizeof( client ), peer_id );
1319                    peer->client = tr_strdup( client );
1320                }
1321                peer->port = port;
1322                peer->io = io;
1323                peer->msgs =
1324                    tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
1325                                    &peer->msgsTag );
1326
1327                success = TRUE;
1328            }
1329        }
1330    }
1331
1332    if( t )
1333        torrentUnlock( t );
1334
1335    return success;
1336}
1337
1338void
1339tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1340                       struct in_addr * addr,
1341                       uint16_t         port,
1342                       int              socket )
1343{
1344    managerLock( manager );
1345
1346    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1347    {
1348        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1349               inet_ntoa( *addr ) );
1350        tr_netClose( socket );
1351    }
1352    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1353    {
1354        tr_netClose( socket );
1355    }
1356    else /* we don't have a connetion to them yet... */
1357    {
1358        tr_peerIo *    io;
1359        tr_handshake * handshake;
1360
1361        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1362
1363        handshake = tr_handshakeNew( io,
1364                                     manager->session->encryptionMode,
1365                                     myHandshakeDoneCB,
1366                                     manager );
1367
1368        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1369                                 handshakeCompare );
1370    }
1371
1372    managerUnlock( manager );
1373}
1374
1375void
1376tr_peerMgrAddPex( tr_peerMgr *    manager,
1377                  const uint8_t * torrentHash,
1378                  uint8_t         from,
1379                  const tr_pex *  pex )
1380{
1381    Torrent * t;
1382
1383    managerLock( manager );
1384
1385    t = getExistingTorrent( manager, torrentHash );
1386    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1387        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1388
1389    managerUnlock( manager );
1390}
1391
1392tr_pex *
1393tr_peerMgrCompactToPex( const void *    compact,
1394                        size_t          compactLen,
1395                        const uint8_t * added_f,
1396                        size_t          added_f_len,
1397                        size_t *        pexCount )
1398{
1399    size_t          i;
1400    size_t          n = compactLen / 6;
1401    const uint8_t * walk = compact;
1402    tr_pex *        pex = tr_new0( tr_pex, n );
1403
1404    for( i = 0; i < n; ++i )
1405    {
1406        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1407        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1408        if( added_f && ( n == added_f_len ) )
1409            pex[i].flags = added_f[i];
1410    }
1411
1412    *pexCount = n;
1413    return pex;
1414}
1415
1416/**
1417***
1418**/
1419
1420void
1421tr_peerMgrSetBlame( tr_peerMgr *     manager,
1422                    const uint8_t *  torrentHash,
1423                    tr_piece_index_t pieceIndex,
1424                    int              success )
1425{
1426    if( !success )
1427    {
1428        int        peerCount, i;
1429        Torrent *  t = getExistingTorrent( manager, torrentHash );
1430        tr_peer ** peers;
1431
1432        assert( torrentIsLocked( t ) );
1433
1434        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1435        for( i = 0; i < peerCount; ++i )
1436        {
1437            tr_peer * peer = peers[i];
1438            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1439            {
1440                tordbg(
1441                    t,
1442                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1443                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1444                    pieceIndex, (int)peer->strikes + 1 );
1445                addStrike( t, peer );
1446            }
1447        }
1448    }
1449}
1450
1451int
1452tr_pexCompare( const void * va,
1453               const void * vb )
1454{
1455    const tr_pex * a = va;
1456    const tr_pex * b = vb;
1457    int            i =
1458        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1459
1460    if( i ) return i;
1461    if( a->port < b->port ) return -1;
1462    if( a->port > b->port ) return 1;
1463    return 0;
1464}
1465
1466int tr_pexCompare( const void * a,
1467                   const void * b );
1468
1469static int
1470peerPrefersCrypto( const tr_peer * peer )
1471{
1472    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1473        return TRUE;
1474
1475    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1476        return FALSE;
1477
1478    return tr_peerIoIsEncrypted( peer->io );
1479}
1480
1481int
1482tr_peerMgrGetPeers( tr_peerMgr *    manager,
1483                    const uint8_t * torrentHash,
1484                    tr_pex **       setme_pex )
1485{
1486    int peerCount = 0;
1487    const Torrent *  t;
1488
1489    managerLock( manager );
1490
1491    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1492    if( !t )
1493    {
1494        *setme_pex = NULL;
1495    }
1496    else
1497    {
1498        int i;
1499        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1500        tr_pex * pex = tr_new( tr_pex, peerCount );
1501        tr_pex * walk = pex;
1502
1503        for( i = 0; i < peerCount; ++i, ++walk )
1504        {
1505            const tr_peer * peer = peers[i];
1506            walk->in_addr = peer->in_addr;
1507            walk->port = peer->port;
1508            walk->flags = 0;
1509            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1510            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1511        }
1512
1513        assert( ( walk - pex ) == peerCount );
1514        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1515        *setme_pex = pex;
1516    }
1517
1518    managerUnlock( manager );
1519    return peerCount;
1520}
1521
1522static int reconnectPulse( void * vtorrent );
1523
1524static int rechokePulse( void * vtorrent );
1525
1526void
1527tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1528                        const uint8_t * torrentHash )
1529{
1530    Torrent * t;
1531
1532    managerLock( manager );
1533
1534    t = getExistingTorrent( manager, torrentHash );
1535
1536    assert( t );
1537    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1538    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1539
1540    if( !t->isRunning )
1541    {
1542        t->isRunning = 1;
1543
1544        t->reconnectTimer = tr_timerNew( t->manager->session,
1545                                         reconnectPulse, t,
1546                                         RECONNECT_PERIOD_MSEC );
1547
1548        t->rechokeTimer = tr_timerNew( t->manager->session,
1549                                       rechokePulse, t,
1550                                       RECHOKE_PERIOD_MSEC );
1551
1552        reconnectPulse( t );
1553
1554        rechokePulse( t );
1555
1556        if( !tr_ptrArrayEmpty( t->webseeds ) )
1557            refillSoon( t );
1558    }
1559
1560    managerUnlock( manager );
1561}
1562
1563static void
1564stopTorrent( Torrent * t )
1565{
1566    assert( torrentIsLocked( t ) );
1567
1568    t->isRunning = 0;
1569    tr_timerFree( &t->rechokeTimer );
1570    tr_timerFree( &t->reconnectTimer );
1571
1572    /* disconnect the peers. */
1573    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1574    tr_ptrArrayClear( t->peers );
1575
1576    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1577     * which removes the handshake from t->outgoingHandshakes... */
1578    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1579        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1580}
1581
1582void
1583tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1584                       const uint8_t * torrentHash )
1585{
1586    managerLock( manager );
1587
1588    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1589
1590    managerUnlock( manager );
1591}
1592
1593void
1594tr_peerMgrAddTorrent( tr_peerMgr * manager,
1595                      tr_torrent * tor )
1596{
1597    Torrent * t;
1598
1599    managerLock( manager );
1600
1601    assert( tor );
1602    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1603
1604    t = torrentConstructor( manager, tor );
1605    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1606
1607    managerUnlock( manager );
1608}
1609
1610void
1611tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1612                         const uint8_t * torrentHash )
1613{
1614    Torrent * t;
1615
1616    managerLock( manager );
1617
1618    t = getExistingTorrent( manager, torrentHash );
1619    assert( t );
1620    stopTorrent( t );
1621    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1622    torrentDestructor( t );
1623
1624    managerUnlock( manager );
1625}
1626
1627void
1628tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1629                               const uint8_t *    torrentHash,
1630                               int8_t *           tab,
1631                               unsigned int       tabCount )
1632{
1633    tr_piece_index_t   i;
1634    const Torrent *    t;
1635    const tr_torrent * tor;
1636    float              interval;
1637    int                isComplete;
1638    int                peerCount;
1639    const tr_peer **   peers;
1640
1641    managerLock( manager );
1642
1643    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1644    tor = t->tor;
1645    interval = tor->info.pieceCount / (float)tabCount;
1646    isComplete = tor
1647                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1648    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1649
1650    memset( tab, 0, tabCount );
1651
1652    for( i = 0; tor && i < tabCount; ++i )
1653    {
1654        const int piece = i * interval;
1655
1656        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1657            tab[i] = -1;
1658        else if( peerCount )
1659        {
1660            int j;
1661            for( j = 0; j < peerCount; ++j )
1662                if( tr_bitfieldHas( peers[j]->have, i ) )
1663                    ++tab[i];
1664        }
1665    }
1666
1667    managerUnlock( manager );
1668}
1669
1670/* Returns the pieces that are available from peers */
1671tr_bitfield*
1672tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1673                        const uint8_t *    torrentHash )
1674{
1675    int           i, size;
1676    Torrent *     t;
1677    tr_peer **    peers;
1678    tr_bitfield * pieces;
1679
1680    managerLock( manager );
1681
1682    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1683    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1684    peers = getConnectedPeers( t, &size );
1685    for( i = 0; i < size; ++i )
1686        tr_bitfieldOr( pieces, peers[i]->have );
1687
1688    managerUnlock( manager );
1689    tr_free( peers );
1690    return pieces;
1691}
1692
1693int
1694tr_peerMgrHasConnections( const tr_peerMgr * manager,
1695                          const uint8_t *    torrentHash )
1696{
1697    int             ret;
1698    const Torrent * t;
1699
1700    managerLock( manager );
1701
1702    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1703    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1704               || !tr_ptrArrayEmpty( t->webseeds ) );
1705
1706    managerUnlock( manager );
1707    return ret;
1708}
1709
1710void
1711tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1712                        const uint8_t *    torrentHash,
1713                        int *              setmePeersKnown,
1714                        int *              setmePeersConnected,
1715                        int *              setmeSeedsConnected,
1716                        int *              setmeWebseedsSendingToUs,
1717                        int *              setmePeersSendingToUs,
1718                        int *              setmePeersGettingFromUs,
1719                        int *              setmePeersFrom )
1720{
1721    int                 i, size;
1722    const Torrent *     t;
1723    const tr_peer **    peers;
1724    const tr_webseed ** webseeds;
1725
1726    managerLock( manager );
1727
1728    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1729    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1730
1731    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1732    *setmePeersConnected       = 0;
1733    *setmeSeedsConnected       = 0;
1734    *setmePeersGettingFromUs   = 0;
1735    *setmePeersSendingToUs     = 0;
1736    *setmeWebseedsSendingToUs  = 0;
1737
1738    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1739        setmePeersFrom[i] = 0;
1740
1741    for( i = 0; i < size; ++i )
1742    {
1743        const tr_peer *          peer = peers[i];
1744        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1745
1746        if( peer->io == NULL ) /* not connected */
1747            continue;
1748
1749        ++ * setmePeersConnected;
1750
1751        ++setmePeersFrom[atom->from];
1752
1753        if( clientIsDownloadingFrom( peer ) )
1754            ++ * setmePeersSendingToUs;
1755
1756        if( clientIsUploadingTo( peer ) )
1757            ++ * setmePeersGettingFromUs;
1758
1759        if( atom->flags & ADDED_F_SEED_FLAG )
1760            ++ * setmeSeedsConnected;
1761    }
1762
1763    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1764    for( i = 0; i < size; ++i )
1765    {
1766        if( tr_webseedIsActive( webseeds[i] ) )
1767            ++ * setmeWebseedsSendingToUs;
1768    }
1769
1770    managerUnlock( manager );
1771}
1772
1773float*
1774tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1775                     const uint8_t *    torrentHash )
1776{
1777    const Torrent *     t;
1778    const tr_webseed ** webseeds;
1779    int                 i;
1780    int                 webseedCount;
1781    float *             ret;
1782
1783    assert( manager );
1784    managerLock( manager );
1785
1786    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1787    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1788                                                     &webseedCount );
1789    assert( webseedCount == t->tor->info.webseedCount );
1790    ret = tr_new0( float, webseedCount );
1791
1792    for( i = 0; i < webseedCount; ++i )
1793        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1794            ret[i] = -1.0;
1795
1796    managerUnlock( manager );
1797    return ret;
1798}
1799
1800double
1801tr_peerGetPieceSpeed( const tr_peer    * peer,
1802                      tr_direction       direction )
1803{
1804    assert( peer );
1805    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1806
1807    return tr_rcRate( peer->pieceSpeed[direction] );
1808}
1809
1810
1811struct tr_peer_stat *
1812tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1813                     const   uint8_t     * torrentHash,
1814                     int                 * setmeCount UNUSED )
1815{
1816    int             i, size;
1817    const Torrent * t;
1818    tr_peer **      peers;
1819    tr_peer_stat *  ret;
1820
1821    assert( manager );
1822    managerLock( manager );
1823
1824    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1825    peers = getConnectedPeers( (Torrent*)t, &size );
1826    ret = tr_new0( tr_peer_stat, size );
1827
1828    for( i = 0; i < size; ++i )
1829    {
1830        char *                   pch;
1831        const tr_peer *          peer = peers[i];
1832        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1833        tr_peer_stat *           stat = ret + i;
1834
1835        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1836        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1837                   sizeof( stat->client ) );
1838        stat->port               = ntohs( peer->port );
1839        stat->from               = atom->from;
1840        stat->progress           = peer->progress;
1841        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1842        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1843        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1844        stat->peerIsChoked       = peer->peerIsChoked;
1845        stat->peerIsInterested   = peer->peerIsInterested;
1846        stat->clientIsChoked     = peer->clientIsChoked;
1847        stat->clientIsInterested = peer->clientIsInterested;
1848        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1849        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1850        stat->isUploadingTo      = clientIsUploadingTo( peer );
1851
1852        pch = stat->flagStr;
1853        if( t->optimistic == peer ) *pch++ = 'O';
1854        if( stat->isDownloadingFrom ) *pch++ = 'D';
1855        else if( stat->clientIsInterested ) *pch++ = 'd';
1856        if( stat->isUploadingTo ) *pch++ = 'U';
1857        else if( stat->peerIsInterested ) *pch++ = 'u';
1858        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1859                'K';
1860        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1861        if( stat->isEncrypted ) *pch++ = 'E';
1862        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1863        if( stat->isIncoming ) *pch++ = 'I';
1864        *pch = '\0';
1865    }
1866
1867    *setmeCount = size;
1868    tr_free( peers );
1869
1870    managerUnlock( manager );
1871    return ret;
1872}
1873
1874/**
1875***
1876**/
1877
1878struct ChokeData
1879{
1880    unsigned int    doUnchoke    : 1;
1881    unsigned int    isInterested : 1;
1882    unsigned int    isChoked     : 1;
1883    int             rate;
1884    tr_peer *       peer;
1885};
1886
1887static int
1888compareChoke( const void * va,
1889              const void * vb )
1890{
1891    const struct ChokeData * a = va;
1892    const struct ChokeData * b = vb;
1893
1894    if( a->rate != b->rate ) /* prefer higher overall speeds */
1895        return a->rate > b->rate ? -1 : 1;
1896
1897    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1898        return a->isChoked ? 1 : -1;
1899
1900    return 0;
1901}
1902
1903static int
1904isNew( const tr_peer * peer )
1905{
1906    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1907}
1908
1909static int
1910isSame( const tr_peer * peer )
1911{
1912    return peer && peer->client && strstr( peer->client, "Transmission" );
1913}
1914
1915/**
1916***
1917**/
1918
1919static void
1920rechoke( Torrent * t )
1921{
1922    int                i, peerCount, size, unchokedInterested;
1923    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1924    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1925    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1926
1927    assert( torrentIsLocked( t ) );
1928
1929    /* sort the peers by preference and rate */
1930    for( i = 0, size = 0; i < peerCount; ++i )
1931    {
1932        tr_peer * peer = peers[i];
1933        if( peer->progress >= 1.0 ) /* choke all seeds */
1934            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1935        else if( chokeAll )
1936            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1937        else {
1938            struct ChokeData * n = &choke[size++];
1939            n->peer         = peer;
1940            n->isInterested = peer->peerIsInterested;
1941            n->isChoked     = peer->peerIsChoked;
1942            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1943        }
1944    }
1945
1946    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1947
1948    /**
1949     * Reciprocation and number of uploads capping is managed by unchoking
1950     * the N peers which have the best upload rate and are interested.
1951     * This maximizes the client's download rate. These N peers are
1952     * referred to as downloaders, because they are interested in downloading
1953     * from the client.
1954     *
1955     * Peers which have a better upload rate (as compared to the downloaders)
1956     * but aren't interested get unchoked. If they become interested, the
1957     * downloader with the worst upload rate gets choked. If a client has
1958     * a complete file, it uses its upload rate rather than its download
1959     * rate to decide which peers to unchoke.
1960     */
1961    unchokedInterested = 0;
1962    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1963    {
1964        choke[i].doUnchoke = 1;
1965        if( choke[i].isInterested )
1966            ++unchokedInterested;
1967    }
1968
1969    /* optimistic unchoke */
1970    if( i < size )
1971    {
1972        int                n;
1973        struct ChokeData * c;
1974        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1975
1976        for( ; i < size; ++i )
1977        {
1978            if( choke[i].isInterested )
1979            {
1980                const tr_peer * peer = choke[i].peer;
1981                int             x = 1, y;
1982                if( isNew( peer ) ) x *= 3;
1983                if( isSame( peer ) ) x *= 3;
1984                for( y = 0; y < x; ++y )
1985                    tr_ptrArrayAppend( randPool, &choke[i] );
1986            }
1987        }
1988
1989        if( ( n = tr_ptrArraySize( randPool ) ) )
1990        {
1991            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1992            c->doUnchoke = 1;
1993            t->optimistic = c->peer;
1994        }
1995
1996        tr_ptrArrayFree( randPool, NULL );
1997    }
1998
1999    for( i = 0; i < size; ++i )
2000        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2001
2002    /* cleanup */
2003    tr_free( choke );
2004    tr_free( peers );
2005}
2006
2007static int
2008rechokePulse( void * vtorrent )
2009{
2010    Torrent * t = vtorrent;
2011
2012    torrentLock( t );
2013    rechoke( t );
2014    torrentUnlock( t );
2015    return TRUE;
2016}
2017
2018/***
2019****
2020****  Life and Death
2021****
2022***/
2023
2024static int
2025shouldPeerBeClosed( const Torrent * t,
2026                    const tr_peer * peer,
2027                    int             peerCount )
2028{
2029    const tr_torrent *       tor = t->tor;
2030    const time_t             now = time( NULL );
2031    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2032
2033    /* if it's marked for purging, close it */
2034    if( peer->doPurge )
2035    {
2036        tordbg( t, "purging peer %s because its doPurge flag is set",
2037               tr_peerIoAddrStr( &atom->addr,
2038                                 atom->port ) );
2039        return TRUE;
2040    }
2041
2042    /* if we're seeding and the peer has everything we have,
2043     * and enough time has passed for a pex exchange, then disconnect */
2044    if( tr_torrentIsSeed( tor ) )
2045    {
2046        int peerHasEverything;
2047        if( atom->flags & ADDED_F_SEED_FLAG )
2048            peerHasEverything = TRUE;
2049        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2050            peerHasEverything = FALSE;
2051        else
2052        {
2053            tr_bitfield * tmp =
2054                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2055            tr_bitfieldDifference( tmp, peer->have );
2056            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2057            tr_bitfieldFree( tmp );
2058        }
2059        if( peerHasEverything
2060          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2061        {
2062            tordbg( t, "purging peer %s because we're both seeds",
2063                   tr_peerIoAddrStr( &atom->addr,
2064                                     atom->port ) );
2065            return TRUE;
2066        }
2067    }
2068
2069    /* disconnect if it's been too long since piece data has been transferred.
2070     * this is on a sliding scale based on number of available peers... */
2071    {
2072        const int    relaxStrictnessIfFewerThanN =
2073            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2074        /* if we have >= relaxIfFewerThan, strictness is 100%.
2075         * if we have zero connections, strictness is 0% */
2076        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2077                                  ? 1.0
2078                                  : peerCount /
2079                                  (float)relaxStrictnessIfFewerThanN;
2080        const int    lo = MIN_UPLOAD_IDLE_SECS;
2081        const int    hi = MAX_UPLOAD_IDLE_SECS;
2082        const int    limit = lo + ( ( hi - lo ) * strictness );
2083        const time_t then = peer->pieceDataActivityDate;
2084        const int    idleTime = then ? ( now - then ) : 0;
2085        if( idleTime > limit )
2086        {
2087            tordbg(
2088                t,
2089                "purging peer %s because it's been %d secs since we shared anything",
2090                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2091            return TRUE;
2092        }
2093    }
2094
2095    return FALSE;
2096}
2097
2098static tr_peer **
2099getPeersToClose( Torrent * t,
2100                 int *     setmeSize )
2101{
2102    int               i, peerCount, outsize;
2103    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2104                                                           &peerCount );
2105    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2106
2107    assert( torrentIsLocked( t ) );
2108
2109    for( i = outsize = 0; i < peerCount; ++i )
2110        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2111            ret[outsize++] = peers[i];
2112
2113    *setmeSize = outsize;
2114    return ret;
2115}
2116
2117static int
2118compareCandidates( const void * va,
2119                   const void * vb )
2120{
2121    const struct peer_atom * a = *(const struct peer_atom**) va;
2122    const struct peer_atom * b = *(const struct peer_atom**) vb;
2123
2124    /* <Charles> Here we would probably want to try reconnecting to
2125     * peers that had most recently given us data. Lots of users have
2126     * trouble with resets due to their routers and/or ISPs. This way we
2127     * can quickly recover from an unwanted reset. So we sort
2128     * piece_data_time in descending order.
2129     */
2130
2131    if( a->piece_data_time != b->piece_data_time )
2132        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2133
2134    if( a->numFails != b->numFails )
2135        return a->numFails < b->numFails ? -1 : 1;
2136
2137    if( a->time != b->time )
2138        return a->time < b->time ? -1 : 1;
2139
2140    return 0;
2141}
2142
2143static int
2144getReconnectIntervalSecs( const struct peer_atom * atom )
2145{
2146    int          sec;
2147    const time_t now = time( NULL );
2148
2149    /* if we were recently connected to this peer and transferring piece
2150     * data, try to reconnect to them sooner rather that later -- we don't
2151     * want network troubles to get in the way of a good peer. */
2152    if( ( now - atom->piece_data_time ) <=
2153       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2154        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2155
2156    /* don't allow reconnects more often than our minimum */
2157    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2158        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2159
2160    /* otherwise, the interval depends on how many times we've tried
2161     * and failed to connect to the peer */
2162    else switch( atom->numFails )
2163        {
2164            case 0:
2165                sec = 0; break;
2166
2167            case 1:
2168                sec = 5; break;
2169
2170            case 2:
2171                sec = 2 * 60; break;
2172
2173            case 3:
2174                sec = 15 * 60; break;
2175
2176            case 4:
2177                sec = 30 * 60; break;
2178
2179            case 5:
2180                sec = 60 * 60; break;
2181
2182            default:
2183                sec = 120 * 60; break;
2184        }
2185
2186    return sec;
2187}
2188
2189static struct peer_atom **
2190getPeerCandidates(                               Torrent * t,
2191                                           int * setmeSize )
2192{
2193    int                 i, atomCount, retCount;
2194    struct peer_atom ** atoms;
2195    struct peer_atom ** ret;
2196    const time_t        now = time( NULL );
2197    const int           seed = tr_torrentIsSeed( t->tor );
2198
2199    assert( torrentIsLocked( t ) );
2200
2201    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2202    ret = tr_new( struct peer_atom*, atomCount );
2203    for( i = retCount = 0; i < atomCount; ++i )
2204    {
2205        int                interval;
2206        struct peer_atom * atom = atoms[i];
2207
2208        /* peer fed us too much bad data ... we only keep it around
2209         * now to weed it out in case someone sends it to us via pex */
2210        if( atom->myflags & MYFLAG_BANNED )
2211            continue;
2212
2213        /* peer was unconnectable before, so we're not going to keep trying.
2214         * this is needs a separate flag from `banned', since if they try
2215         * to connect to us later, we'll let them in */
2216        if( atom->myflags & MYFLAG_UNREACHABLE )
2217            continue;
2218
2219        /* we don't need two connections to the same peer... */
2220        if( peerIsInUse( t, &atom->addr ) )
2221            continue;
2222
2223        /* no need to connect if we're both seeds... */
2224        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2225            continue;
2226
2227        /* don't reconnect too often */
2228        interval = getReconnectIntervalSecs( atom );
2229        if( ( now - atom->time ) < interval )
2230        {
2231            tordbg(
2232                t,
2233                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2234                i, tr_peerIoAddrStr( &atom->addr,
2235                                     atom->port ), interval );
2236            continue;
2237        }
2238
2239        /* Don't connect to peers in our blocklist */
2240        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2241            continue;
2242
2243        ret[retCount++] = atom;
2244    }
2245
2246    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2247    *setmeSize = retCount;
2248    return ret;
2249}
2250
2251static int
2252reconnectPulse( void * vtorrent )
2253{
2254    Torrent *     t = vtorrent;
2255    static time_t prevTime = 0;
2256    static int    newConnectionsThisSecond = 0;
2257    time_t        now;
2258
2259    torrentLock( t );
2260
2261    now = time( NULL );
2262    if( prevTime != now )
2263    {
2264        prevTime = now;
2265        newConnectionsThisSecond = 0;
2266    }
2267
2268    if( !t->isRunning )
2269    {
2270        removeAllPeers( t );
2271    }
2272    else
2273    {
2274        int                 i, nCandidates, nBad;
2275        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2276        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2277
2278        if( nBad || nCandidates )
2279            tordbg(
2280                t, "reconnect pulse for [%s]: %d bad connections, "
2281                   "%d connection candidates, %d atoms, max per pulse is %d",
2282                t->tor->info.name, nBad, nCandidates,
2283                tr_ptrArraySize( t->pool ),
2284                (int)MAX_RECONNECTIONS_PER_PULSE );
2285
2286        /* disconnect some peers.
2287           if we transferred piece data, then they might be good peers,
2288           so reset their `numFails' weight to zero.  otherwise we connected
2289           to them fruitlessly, so mark it as another fail */
2290        for( i = 0; i < nBad; ++i )
2291        {
2292            tr_peer *          peer = connections[i];
2293            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2294            if( peer->pieceDataActivityDate )
2295                atom->numFails = 0;
2296            else
2297                ++atom->numFails;
2298            tordbg( t, "removing bad peer %s",
2299                   tr_peerIoGetAddrStr( peer->io ) );
2300            removePeer( t, peer );
2301        }
2302
2303        /* add some new ones */
2304        for( i = 0;    ( i < nCandidates )
2305           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2306           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2307           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2308             ++i )
2309        {
2310            tr_peerMgr *       mgr = t->manager;
2311            struct peer_atom * atom = candidates[i];
2312            tr_peerIo *        io;
2313
2314            tordbg( t, "Starting an OUTGOING connection with %s",
2315                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2316
2317            io =
2318                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2319                                      t->hash );
2320            if( io == NULL )
2321            {
2322                atom->myflags |= MYFLAG_UNREACHABLE;
2323            }
2324            else
2325            {
2326                tr_handshake * handshake = tr_handshakeNew(
2327                    io,
2328                    mgr->session->
2329                    encryptionMode,
2330                    myHandshakeDoneCB,
2331                    mgr );
2332
2333                assert( tr_peerIoGetTorrentHash( io ) );
2334
2335                ++newConnectionsThisSecond;
2336
2337                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2338                                         handshakeCompare );
2339            }
2340
2341            atom->time = time( NULL );
2342        }
2343
2344        /* cleanup */
2345        tr_free( connections );
2346        tr_free( candidates );
2347    }
2348
2349    torrentUnlock( t );
2350    return TRUE;
2351}
2352
2353/****
2354*****
2355*****  BANDWIDTH ALLOCATION
2356*****
2357****/
2358
2359static void
2360pumpAllPeers( tr_peerMgr * mgr )
2361{
2362    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2363    int       i, j;
2364
2365    for( i = 0; i < torrentCount; ++i )
2366    {
2367        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2368        for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
2369        {
2370            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2371            tr_peerMsgsPulse( peer->msgs );
2372        }
2373    }
2374}
2375
2376static void
2377setTorrentBandwidth( Torrent * t,
2378                     tr_direction dir,
2379                     tr_bandwidth * bandwidth )
2380{
2381    int i;
2382    int peerCount = 0;
2383    tr_peer ** peers = getConnectedPeers( t, &peerCount );
2384
2385    for( i=0; i<peerCount; ++i )
2386        tr_peerIoSetBandwidth( peers[i]->io, dir, bandwidth );
2387
2388    tr_free( peers );
2389}
2390
2391#if 0
2392#define DEBUG_DIRECTION TR_DOWN
2393#endif
2394
2395static double
2396bytesPerPulse( double KiB_per_second )
2397{
2398    return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 );
2399}
2400
2401/*
2402 * @param currentSpeed current speed in KiB/s
2403 * @param desiredSpeed desired speed in KiB/s
2404 */
2405static double
2406allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed )
2407{
2408    const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
2409    const double current_bytes_per_pulse = bytesPerPulse( currentSpeed );
2410    const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed );
2411    const double min = desired_bytes_per_pulse * 0.90;
2412    const double max = desired_bytes_per_pulse * 1.50;
2413    const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
2414                                  - ( current_bytes_per_pulse * pulses_per_history );
2415    double clamped;
2416
2417    /* clamp the return value to lessen oscillation */
2418    clamped = next_pulse_bytes;
2419    clamped = MAX( clamped, min );
2420    clamped = MIN( clamped, max );
2421
2422#ifdef DEBUG_DIRECTION
2423if( dir == DEBUG_DIRECTION )
2424fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
2425         currentSpeed,
2426         desiredSpeed,
2427         clamped/1024.0,
2428         next_pulse_bytes/1024.0 );
2429#endif
2430
2431    return clamped;
2432}
2433
2434/**
2435 * Allocate bandwidth for each peer connection.
2436 *
2437 * @param mgr the peer manager
2438 * @param direction whether to allocate upload or download bandwidth
2439 */
2440static void
2441allocateBandwidth( tr_peerMgr * mgr,
2442                   tr_direction direction )
2443{
2444    int              i;
2445    tr_session     * session = mgr->session;
2446    const int        torrentCount = tr_ptrArraySize( mgr->torrents );
2447    Torrent       ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
2448    tr_bandwidth   * global_pool = session->bandwidth[direction];
2449
2450    assert( mgr );
2451    assert( direction == TR_UP || direction == TR_DOWN );
2452
2453    /* before allocating bandwidth, pump the connected peers */
2454    pumpAllPeers( mgr );
2455
2456    for( i=0; i<torrentCount; ++i )
2457    {
2458        Torrent * t = torrents[i];
2459        tr_speedlimit speedMode;
2460
2461        /* no point in allocating bandwidth for stopped torrents */
2462        if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
2463            continue;
2464
2465        /* if piece data is disallowed, don't bother limiting bandwidth --
2466         * we won't be asking for, or sending out, any pieces */
2467        if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
2468            speedMode = TR_SPEEDLIMIT_UNLIMITED;
2469        else
2470            speedMode = tr_torrentGetSpeedMode( t->tor, direction );
2471           
2472        /* process the torrent's peers based on its speed mode */
2473        switch( speedMode )
2474        {
2475            case TR_SPEEDLIMIT_UNLIMITED:
2476            {
2477                tr_bandwidth * b = t->tor->bandwidth[direction];
2478                tr_bandwidthSetUnlimited( b );
2479                setTorrentBandwidth( t, direction, b );
2480                break;
2481            }
2482
2483            case TR_SPEEDLIMIT_SINGLE:
2484            {
2485                tr_bandwidth * b = t->tor->bandwidth[direction];
2486                const double currentSpeed = tr_bandwidthGetPieceSpeed( b );
2487                const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction );
2488                const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
2489#ifdef DEBUG_DIRECTION
2490if( direction == DEBUG_DIRECTION )
2491fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse );
2492#endif
2493                tr_bandwidthSetLimited( b, bytesPerPulse );
2494                setTorrentBandwidth( t, direction, b );
2495                break;
2496            }
2497
2498            case TR_SPEEDLIMIT_GLOBAL:
2499            {
2500                setTorrentBandwidth( t, direction, global_pool );
2501                break;
2502            }
2503        }
2504    }
2505
2506    /* handle the global pool's connections */
2507    if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
2508        tr_bandwidthSetUnlimited( global_pool );
2509    else {
2510        const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool );
2511        const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction );
2512        const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
2513#ifdef DEBUG_DIRECTION
2514if( direction == DEBUG_DIRECTION )
2515fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse );
2516#endif
2517        tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse );
2518    }
2519
2520    /* now that we've allocated bandwidth, pump all the connected peers */
2521    pumpAllPeers( mgr );
2522}
2523
2524static int
2525bandwidthPulse( void * vmgr )
2526{
2527    tr_peerMgr * mgr = vmgr;
2528    int          i;
2529
2530    managerLock( mgr );
2531
2532    /* allocate the upload and download bandwidth */
2533    for( i = 0; i < 2; ++i )
2534        allocateBandwidth( mgr, i );
2535
2536    managerUnlock( mgr );
2537    return TRUE;
2538}
Note: See TracBrowser for help on using the repository browser.