source: trunk/libtransmission/peer-mgr.c @ 7441

Last change on this file since 7441 was 7441, checked in by charles, 12 years ago

try to rework the bandwidth code yet again s.t. it satisfies all three: (1) fairly distributes bandwidth across all peers, (2) scales well in high-bandwidth situations, (3) is good at hitting and staying at bandwidth limits/goals

  • Property svn:keywords set to Date Rev Author Id
File size: 67.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7441 2008-12-20 22:19:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 500,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/**
100 * Peer information that should be kept even before we've connected and
101 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
102 * which ones would make good candidates for connecting to, and to watch out
103 * for banned peers.
104 *
105 * @see tr_peer
106 * @see tr_peermsgs
107 */
108struct peer_atom
109{
110    uint8_t     from;
111    uint8_t     flags;       /* these match the added_f flags */
112    uint8_t     myflags;     /* flags that aren't defined in added_f */
113    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
114    tr_port     port;
115    uint16_t    numFails;
116    tr_address  addr;
117    time_t      time;        /* when the peer's connection status last changed */
118    time_t      piece_data_time;
119};
120
121typedef struct
122{
123    tr_bool         isRunning;
124
125    uint8_t         hash[SHA_DIGEST_LENGTH];
126    int         *   pendingRequestCount;
127    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
128    tr_ptrArray *   pool; /* struct peer_atom */
129    tr_ptrArray *   peers; /* tr_peer */
130    tr_ptrArray *   webseeds; /* tr_webseed */
131    tr_timer *      reconnectTimer;
132    tr_timer *      rechokeTimer;
133    tr_timer *      refillTimer;
134    tr_torrent *    tor;
135    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
136
137    struct tr_peerMgr * manager;
138}
139Torrent;
140
141struct tr_peerMgr
142{
143    tr_session      * session;
144    tr_ptrArray     * torrents; /* Torrent */
145    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
146    tr_ptrArray     * finishedHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va,
202                        const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a,
211                  const void * b )
212{
213    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
214}
215
216static tr_handshake*
217getExistingHandshake( tr_ptrArray      * handshakes,
218                      const tr_address * addr )
219{
220    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
221}
222
223static int
224comparePeerAtomToAddress( const void * va, const void * vb )
225{
226    const struct peer_atom * a = va;
227
228    return tr_compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va, const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va, const void * vb )
274{
275    const tr_peer * a = va;
276    const tr_peer * b = vb;
277
278    return tr_compareAddresses( &a->addr, &b->addr );
279}
280
281static int
282peerCompareToAddr( const void * va, const void * vb )
283{
284    const tr_peer * a = va;
285
286    return tr_compareAddresses( &a->addr, vb );
287}
288
289static tr_peer*
290getExistingPeer( Torrent          * torrent,
291                 const tr_address * addr )
292{
293    assert( torrentIsLocked( torrent ) );
294    assert( addr );
295
296    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
297}
298
299static struct peer_atom*
300getExistingAtom( const Torrent    * t,
301                 const tr_address * addr )
302{
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( t->outgoingHandshakes, addr )
317        || getExistingHandshake( t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( tr_torrent * tor, const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
327    return p;
328}
329
330static tr_peer*
331getPeer( Torrent          * torrent,
332         const tr_address * addr )
333{
334    tr_peer * peer;
335
336    assert( torrentIsLocked( torrent ) );
337
338    peer = getExistingPeer( torrent, addr );
339
340    if( peer == NULL )
341    {
342        peer = peerConstructor( torrent->tor, addr );
343        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
344    }
345
346    return peer;
347}
348
349static void
350peerDestructor( tr_peer * peer )
351{
352    assert( peer );
353
354    if( peer->msgs != NULL )
355    {
356        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
357        tr_peerMsgsFree( peer->msgs );
358    }
359
360    tr_peerIoFree( peer->io );
361
362    tr_bitfieldFree( peer->have );
363    tr_bitfieldFree( peer->blame );
364    tr_free( peer->client );
365
366    tr_bandwidthFree( peer->bandwidth );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t,
373            tr_peer * peer )
374{
375    tr_peer *          removed;
376    struct peer_atom * atom;
377
378    assert( torrentIsLocked( t ) );
379
380    atom = getExistingAtom( t, &peer->addr );
381    assert( atom );
382    atom->time = time( NULL );
383
384    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
385    assert( removed == peer );
386    peerDestructor( removed );
387}
388
389static void
390removeAllPeers( Torrent * t )
391{
392    while( !tr_ptrArrayEmpty( t->peers ) )
393        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
394}
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400    uint8_t   hash[SHA_DIGEST_LENGTH];
401
402    assert( t );
403    assert( !t->isRunning );
404    assert( t->peers );
405    assert( torrentIsLocked( t ) );
406    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
407    assert( tr_ptrArrayEmpty( t->peers ) );
408
409    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
410
411    tr_timerFree( &t->reconnectTimer );
412    tr_timerFree( &t->rechokeTimer );
413    tr_timerFree( &t->refillTimer );
414
415    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
416    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
417    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
418    tr_ptrArrayFree( t->peers, NULL );
419
420    tr_free( t->pendingRequestCount );
421    tr_free( t );
422}
423
424static void peerCallbackFunc( void * vpeer,
425                              void * vevent,
426                              void * vt );
427
428static Torrent*
429torrentConstructor( tr_peerMgr * manager,
430                    tr_torrent * tor )
431{
432    int       i;
433    Torrent * t;
434
435    t = tr_new0( Torrent, 1 );
436    t->manager = manager;
437    t->tor = tor;
438    t->pool = tr_ptrArrayNew( );
439    t->peers = tr_ptrArrayNew( );
440    t->webseeds = tr_ptrArrayNew( );
441    t->outgoingHandshakes = tr_ptrArrayNew( );
442    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
443
444    for( i = 0; i < tor->info.webseedCount; ++i )
445    {
446        tr_webseed * w =
447            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
448                           t );
449        tr_ptrArrayAppend( t->webseeds, w );
450    }
451
452    return t;
453}
454
455
456static int bandwidthPulse( void * vmgr );
457
458
459tr_peerMgr*
460tr_peerMgrNew( tr_session * session )
461{
462    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
463
464    m->session = session;
465    m->torrents = tr_ptrArrayNew( );
466    m->incomingHandshakes = tr_ptrArrayNew( );
467    m->finishedHandshakes = tr_ptrArrayNew( );
468    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
469    return m;
470}
471
472void
473tr_peerMgrFree( tr_peerMgr * manager )
474{
475    tr_handshake * handshake;
476
477    managerLock( manager );
478
479    tr_timerFree( &manager->bandwidthTimer );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
485
486    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
487
488    while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
489        tr_handshakeFree( handshake );
490
491    tr_ptrArrayFree( manager->finishedHandshakes, NULL );
492
493    /* free the torrents. */
494    tr_ptrArrayFree( manager->torrents, torrentDestructor );
495
496    managerUnlock( manager );
497    tr_free( manager );
498}
499
500static tr_peer**
501getConnectedPeers( Torrent * t, int * setmeCount )
502{
503    int i, peerCount, connectionCount;
504    tr_peer **peers;
505    tr_peer **ret;
506
507    assert( torrentIsLocked( t ) );
508
509    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
510    ret = tr_new( tr_peer *, peerCount );
511
512    for( i = connectionCount = 0; i < peerCount; ++i )
513        if( peers[i]->msgs )
514            ret[connectionCount++] = peers[i];
515
516    *setmeCount = connectionCount;
517    return ret;
518}
519
520static int
521clientIsDownloadingFrom( const tr_peer * peer )
522{
523    return peer->clientIsInterested && !peer->clientIsChoked;
524}
525
526static int
527clientIsUploadingTo( const tr_peer * peer )
528{
529    return peer->peerIsInterested && !peer->peerIsChoked;
530}
531
532/***
533****
534***/
535
536tr_bool
537tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
538                      const uint8_t      * torrentHash,
539                      const tr_address   * addr )
540{
541    tr_bool isSeed = FALSE;
542    const Torrent * t = NULL;
543    const struct peer_atom * atom = NULL;
544
545    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
546    if( t )
547        atom = getExistingAtom( t, addr );
548    if( atom )
549        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
550
551    return isSeed;
552}
553
554/****
555*****
556*****  REFILL
557*****
558****/
559
560static void
561assertValidPiece( Torrent * t, tr_piece_index_t piece )
562{
563    assert( t );
564    assert( t->tor );
565    assert( piece < t->tor->info.pieceCount );
566}
567
568static int
569getPieceRequests( Torrent * t, tr_piece_index_t piece )
570{
571    assertValidPiece( t, piece );
572
573    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
574}
575
576static void
577incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
578{
579    assertValidPiece( t, piece );
580
581    if( t->pendingRequestCount == NULL )
582        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
583    t->pendingRequestCount[piece]++;
584}
585
586static void
587decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
588{
589    assertValidPiece( t, piece );
590
591    if( t->pendingRequestCount )
592        t->pendingRequestCount[piece]--;
593}
594
595struct tr_refill_piece
596{
597    tr_priority_t    priority;
598    uint32_t         piece;
599    uint32_t         peerCount;
600    int              random;
601    int              pendingRequestCount;
602    int              missingBlockCount;
603};
604
605static int
606compareRefillPiece( const void * aIn, const void * bIn )
607{
608    const struct tr_refill_piece * a = aIn;
609    const struct tr_refill_piece * b = bIn;
610
611    /* if one piece has a higher priority, it goes first */
612    if( a->priority != b->priority )
613        return a->priority > b->priority ? -1 : 1;
614
615    /* have a per-priority endgame */
616    if( a->pendingRequestCount != b->pendingRequestCount )
617        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
618
619    /* fewer missing pieces goes first */
620    if( a->missingBlockCount != b->missingBlockCount )
621        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
622
623    /* otherwise if one has fewer peers, it goes first */
624    if( a->peerCount != b->peerCount )
625        return a->peerCount < b->peerCount ? -1 : 1;
626
627    /* otherwise go with our random seed */
628    if( a->random != b->random )
629        return a->random < b->random ? -1 : 1;
630
631    return 0;
632}
633
634static tr_piece_index_t *
635getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
636{
637    const tr_torrent  * tor = t->tor;
638    const tr_info     * inf = &tor->info;
639    tr_piece_index_t    i;
640    tr_piece_index_t    poolSize = 0;
641    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
642    int                 peerCount;
643    tr_peer**           peers;
644
645    assert( torrentIsLocked( t ) );
646
647    peers = getConnectedPeers( t, &peerCount );
648
649    /* make a list of the pieces that we want but don't have */
650    for( i = 0; i < inf->pieceCount; ++i )
651        if( !tor->info.pieces[i].dnd
652                && !tr_cpPieceIsComplete( tor->completion, i ) )
653            pool[poolSize++] = i;
654
655    /* sort the pool by which to request next */
656    if( poolSize > 1 )
657    {
658        tr_piece_index_t j;
659        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
660
661        for( j = 0; j < poolSize; ++j )
662        {
663            int k;
664            const tr_piece_index_t piece = pool[j];
665            struct tr_refill_piece * setme = p + j;
666
667            setme->piece = piece;
668            setme->priority = inf->pieces[piece].priority;
669            setme->peerCount = 0;
670            setme->random = tr_cryptoWeakRandInt( INT_MAX );
671            setme->pendingRequestCount = getPieceRequests( t, piece );
672            setme->missingBlockCount
673                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
674
675            for( k = 0; k < peerCount; ++k )
676            {
677                const tr_peer * peer = peers[k];
678                if( peer->peerIsInterested
679                        && !peer->clientIsChoked
680                        && tr_bitfieldHas( peer->have, piece ) )
681                    ++setme->peerCount;
682            }
683        }
684
685        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
686               compareRefillPiece );
687
688        for( j = 0; j < poolSize; ++j )
689            pool[j] = p[j].piece;
690
691        tr_free( p );
692    }
693
694    tr_free( peers );
695
696    *pieceCount = poolSize;
697    return pool;
698}
699
700struct tr_blockIterator
701{
702    Torrent * t;
703    tr_block_index_t blockIndex, blockCount, *blocks;
704    tr_piece_index_t pieceIndex, pieceCount, *pieces;
705};
706
707static struct tr_blockIterator*
708blockIteratorNew( Torrent * t )
709{
710    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
711    i->t = t;
712    i->pieces = getPreferredPieces( t, &i->pieceCount );
713    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
714    return i;
715}
716
717static int
718blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
719{
720    int found;
721    Torrent * t = i->t;
722    tr_torrent * tor = t->tor;
723
724    while( ( i->blockIndex == i->blockCount )
725        && ( i->pieceIndex < i->pieceCount ) )
726    {
727        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
728        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
729        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
730        tr_block_index_t block;
731
732        assert( index < tor->info.pieceCount );
733
734        i->blockCount = 0;
735        i->blockIndex = 0;
736        for( block=b; block!=e; ++block )
737            if( !tr_cpBlockIsComplete( tor->completion, block ) )
738                i->blocks[i->blockCount++] = block;
739    }
740
741    if(( found = ( i->blockIndex < i->blockCount )))
742        *setme = i->blocks[i->blockIndex++];
743
744    return found;
745}
746
747static void
748blockIteratorFree( struct tr_blockIterator * i )
749{
750    tr_free( i->blocks );
751    tr_free( i->pieces );
752    tr_free( i );
753}
754
755static tr_peer**
756getPeersUploadingToClient( Torrent * t,
757                           int *     setmeCount )
758{
759    int j;
760    int peerCount = 0;
761    int retCount = 0;
762    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
763    tr_peer ** ret = tr_new( tr_peer *, peerCount );
764
765    j = 0; /* this is a temporary test to make sure we walk through all the peers */
766    if( peerCount )
767    {
768        /* Get a list of peers we're downloading from.
769           Pick a different starting point each time so all peers
770           get a chance at being the first in line */
771        const int fencepost = tr_cryptoWeakRandInt( peerCount );
772        int i = fencepost;
773        do {
774            if( clientIsDownloadingFrom( peers[i] ) )
775                ret[retCount++] = peers[i];
776            i = ( i + 1 ) % peerCount;
777            ++j;
778        } while( i != fencepost );
779    }
780    assert( j == peerCount );
781    *setmeCount = retCount;
782    return ret;
783}
784
785static uint32_t
786getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
787{
788    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
789    const uint64_t blockPos = tor->blockSize * b;
790    assert( blockPos >= piecePos );
791    return (uint32_t)( blockPos - piecePos );
792}
793
794static int
795refillPulse( void * vtorrent )
796{
797    tr_block_index_t block;
798    int peerCount;
799    int webseedCount;
800    tr_peer ** peers;
801    tr_webseed ** webseeds;
802    struct tr_blockIterator * blockIterator;
803    Torrent * t = vtorrent;
804    tr_torrent * tor = t->tor;
805
806    if( !t->isRunning )
807        return TRUE;
808    if( tr_torrentIsSeed( t->tor ) )
809        return TRUE;
810
811    torrentLock( t );
812    tordbg( t, "Refilling Request Buffers..." );
813
814    blockIterator = blockIteratorNew( t );
815    peers = getPeersUploadingToClient( t, &peerCount );
816    webseedCount = tr_ptrArraySize( t->webseeds );
817    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
818                          webseedCount * sizeof( tr_webseed* ) );
819
820    while( ( webseedCount || peerCount )
821        && blockIteratorNext( blockIterator, &block ) )
822    {
823        int j;
824        int handled = FALSE;
825
826        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
827        const uint32_t offset = getBlockOffsetInPiece( tor, block );
828        const uint32_t length = tr_torBlockCountBytes( tor, block );
829
830        /* find a peer who can ask for this block */
831        for( j=0; !handled && j<peerCount; )
832        {
833            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
834            switch( val )
835            {
836                case TR_ADDREQ_FULL:
837                case TR_ADDREQ_CLIENT_CHOKED:
838                    peers[j] = peers[--peerCount];
839                    break;
840
841                case TR_ADDREQ_MISSING:
842                case TR_ADDREQ_DUPLICATE:
843                    ++j;
844                    break;
845
846                case TR_ADDREQ_OK:
847                    incrementPieceRequests( t, index );
848                    handled = TRUE;
849                    break;
850
851                default:
852                    assert( 0 && "unhandled value" );
853                    break;
854            }
855        }
856
857        /* maybe one of the webseeds can do it */
858        for( j=0; !handled && j<webseedCount; )
859        {
860            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
861                                                          index, offset, length );
862            switch( val )
863            {
864                case TR_ADDREQ_FULL:
865                    webseeds[j] = webseeds[--webseedCount];
866                    break;
867
868                case TR_ADDREQ_OK:
869                    incrementPieceRequests( t, index );
870                    handled = TRUE;
871                    break;
872
873                default:
874                    assert( 0 && "unhandled value" );
875                    break;
876            }
877        }
878    }
879
880    /* cleanup */
881    blockIteratorFree( blockIterator );
882    tr_free( webseeds );
883    tr_free( peers );
884
885    t->refillTimer = NULL;
886    torrentUnlock( t );
887    return FALSE;
888}
889
890static void
891broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
892{
893    int i, size;
894    tr_peer ** peers;
895
896    assert( torrentIsLocked( t ) );
897
898    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
899    peers = getConnectedPeers( t, &size );
900    for( i=0; i<size; ++i )
901        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
902    tr_free( peers );
903}
904
905static void
906addStrike( Torrent * t,
907           tr_peer * peer )
908{
909    tordbg( t, "increasing peer %s strike count to %d",
910            tr_peerIoAddrStr( &peer->addr,
911                              peer->port ), peer->strikes + 1 );
912
913    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
914    {
915        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
916        atom->myflags |= MYFLAG_BANNED;
917        peer->doPurge = 1;
918        tordbg( t, "banning peer %s",
919               tr_peerIoAddrStr( &atom->addr, atom->port ) );
920    }
921}
922
923static void
924gotBadPiece( Torrent *        t,
925             tr_piece_index_t pieceIndex )
926{
927    tr_torrent *   tor = t->tor;
928    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
929
930    tor->corruptCur += byteCount;
931    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
932}
933
934static void
935refillSoon( Torrent * t )
936{
937    if( t->refillTimer == NULL )
938        t->refillTimer = tr_timerNew( t->manager->session,
939                                      refillPulse, t,
940                                      REFILL_PERIOD_MSEC );
941}
942
943static void
944peerSuggestedPiece( Torrent            * t,
945                    tr_peer            * peer,
946                    tr_piece_index_t     pieceIndex,
947                    int                  isFastAllowed )
948{
949    assert( t );
950    assert( peer );
951    assert( peer->msgs );
952
953    /* is this a valid piece? */
954    if(  pieceIndex >= t->tor->info.pieceCount )
955        return;
956
957    /* don't ask for it if we've already got it */
958    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
959        return;
960
961    /* don't ask for it if they don't have it */
962    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
963        return;
964
965    /* don't ask for it if we're choked and it's not fast */
966    if( !isFastAllowed && peer->clientIsChoked )
967        return;
968
969    /* request the blocks that we don't have in this piece */
970    {
971        tr_block_index_t block;
972        const tr_torrent * tor = t->tor;
973        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
974        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
975
976        for( block=start; block<end; ++block )
977        {
978            if( !tr_cpBlockIsComplete( tor->completion, block ) )
979            {
980                const uint32_t offset = getBlockOffsetInPiece( tor, block );
981                const uint32_t length = tr_torBlockCountBytes( tor, block );
982                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
983                incrementPieceRequests( t, pieceIndex );
984            }
985        }
986    }
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent             * t = vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_UPLOAD_ONLY:
1003            /* update our atom */
1004            if( peer ) {
1005                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1006                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1007            }
1008            break;
1009
1010        case TR_PEER_NEED_REQ:
1011            refillSoon( t );
1012            break;
1013
1014        case TR_PEER_CANCEL:
1015            decrementPieceRequests( t, e->pieceIndex );
1016            break;
1017
1018        case TR_PEER_PEER_GOT_DATA:
1019        {
1020            const time_t now = time( NULL );
1021            tr_torrent * tor = t->tor;
1022
1023            tor->activityDate = now;
1024
1025            if( e->wasPieceData )
1026                tor->uploadedCur += e->length;
1027
1028            /* update the stats */
1029            if( e->wasPieceData )
1030                tr_statsAddUploaded( tor->session, e->length );
1031
1032            /* update our atom */
1033            if( peer ) {
1034                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1035                if( e->wasPieceData )
1036                    a->piece_data_time = now;
1037            }
1038
1039            break;
1040        }
1041
1042        case TR_PEER_CLIENT_GOT_SUGGEST:
1043            if( peer )
1044                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1045            break;
1046
1047        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1048            if( peer )
1049                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1050            break;
1051
1052        case TR_PEER_CLIENT_GOT_DATA:
1053        {
1054            const time_t now = time( NULL );
1055            tr_torrent * tor = t->tor;
1056            tor->activityDate = now;
1057
1058            /* only add this to downloadedCur if we got it from a peer --
1059             * webseeds shouldn't count against our ratio.  As one tracker
1060             * admin put it, "Those pieces are downloaded directly from the
1061             * content distributor, not the peers, it is the tracker's job
1062             * to manage the swarms, not the web server and does not fit
1063             * into the jurisdiction of the tracker." */
1064            if( peer && e->wasPieceData )
1065                tor->downloadedCur += e->length;
1066
1067            /* update the stats */ 
1068            if( e->wasPieceData )
1069                tr_statsAddDownloaded( tor->session, e->length );
1070
1071            /* update our atom */
1072            if( peer ) {
1073                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1074                if( e->wasPieceData )
1075                    a->piece_data_time = now;
1076            }
1077
1078            break;
1079        }
1080
1081        case TR_PEER_PEER_PROGRESS:
1082        {
1083            if( peer )
1084            {
1085                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1086                const int peerIsSeed = e->progress >= 1.0;
1087                if( peerIsSeed ) {
1088                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1089                    atom->flags |= ADDED_F_SEED_FLAG;
1090                } else {
1091                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1092                    atom->flags &= ~ADDED_F_SEED_FLAG;
1093                }
1094            }
1095            break;
1096        }
1097
1098        case TR_PEER_CLIENT_GOT_BLOCK:
1099        {
1100            tr_torrent *     tor = t->tor;
1101
1102            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1103
1104            tr_cpBlockAdd( tor->completion, block );
1105            decrementPieceRequests( t, e->pieceIndex );
1106
1107            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1108
1109            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1110            {
1111                const tr_piece_index_t p = e->pieceIndex;
1112                const tr_bool ok = tr_ioTestPiece( tor, p );
1113
1114                if( !ok )
1115                {
1116                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1117                               (unsigned long)p );
1118                }
1119
1120                tr_torrentSetHasPiece( tor, p, ok );
1121                tr_torrentSetPieceChecked( tor, p, TRUE );
1122                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1123
1124                if( !ok )
1125                    gotBadPiece( t, p );
1126                else
1127                {
1128                    int i, peerCount;
1129                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1130                    for( i = 0; i < peerCount; ++i )
1131                        tr_peerMsgsHave( peers[i]->msgs, p );
1132                    tr_free( peers );
1133                }
1134
1135                tr_torrentRecheckCompleteness( tor );
1136            }
1137            break;
1138        }
1139
1140        case TR_PEER_ERROR:
1141            if( e->err == EINVAL )
1142            {
1143                addStrike( t, peer );
1144                peer->doPurge = 1;
1145                tordbg( t, "setting doPurge because we got an EINVAL error" );
1146            }
1147            else if( ( e->err == ERANGE )
1148                  || ( e->err == EMSGSIZE )
1149                  || ( e->err == ENOTCONN ) )
1150            {
1151                /* some protocol error from the peer */
1152                peer->doPurge = 1;
1153                tordbg( t, "setting doPurge because we got an ERANGE, EMSGSIZE, or ENOTCONN error" );
1154            }
1155            else /* a local error, such as an IO error */
1156            {
1157                t->tor->error = e->err;
1158                tr_strlcpy( t->tor->errorString,
1159                            tr_strerror( t->tor->error ),
1160                            sizeof( t->tor->errorString ) );
1161                tr_torrentStop( t->tor );
1162            }
1163            break;
1164
1165        default:
1166            assert( 0 );
1167    }
1168
1169    torrentUnlock( t );
1170}
1171
1172static void
1173ensureAtomExists( Torrent          * t,
1174                  const tr_address * addr,
1175                  tr_port            port,
1176                  uint8_t            flags,
1177                  uint8_t            from )
1178{
1179    if( getExistingAtom( t, addr ) == NULL )
1180    {
1181        struct peer_atom * a;
1182        a = tr_new0( struct peer_atom, 1 );
1183        a->addr = *addr;
1184        a->port = port;
1185        a->flags = flags;
1186        a->from = from;
1187        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1188        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1189    }
1190}
1191
1192static int
1193getMaxPeerCount( const tr_torrent * tor )
1194{
1195    return tor->maxConnectedPeers;
1196}
1197
1198static int
1199getPeerCount( const Torrent * t )
1200{
1201    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1202}
1203
1204/* FIXME: this is kind of a mess. */
1205static tr_bool
1206myHandshakeDoneCB( tr_handshake  *  handshake,
1207                   tr_peerIo     * io,
1208                   int             isConnected,
1209                   const uint8_t * peer_id,
1210                   void          * vmanager )
1211{
1212    tr_bool            ok = isConnected;
1213    tr_bool            success = FALSE;
1214    tr_port            port;
1215    const tr_address * addr;
1216    tr_peerMgr       * manager = vmanager;
1217    Torrent          * t;
1218    tr_handshake     * ours;
1219
1220    assert( io );
1221    assert( isConnected == 0 || isConnected == 1 );
1222
1223    t = tr_peerIoHasTorrentHash( io )
1224        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1225        : NULL;
1226
1227    if( tr_peerIoIsIncoming ( io ) )
1228        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1229                                        handshake, handshakeCompare );
1230    else if( t )
1231        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1232                                        handshake, handshakeCompare );
1233    else
1234        ours = handshake;
1235
1236    assert( ours );
1237    assert( ours == handshake );
1238
1239    if( t )
1240        torrentLock( t );
1241
1242    addr = tr_peerIoGetAddress( io, &port );
1243
1244    if( !ok || !t || !t->isRunning )
1245    {
1246        if( t )
1247        {
1248            struct peer_atom * atom = getExistingAtom( t, addr );
1249            if( atom )
1250                ++atom->numFails;
1251        }
1252    }
1253    else /* looking good */
1254    {
1255        struct peer_atom * atom;
1256        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1257        atom = getExistingAtom( t, addr );
1258        atom->time = time( NULL );
1259        atom->piece_data_time = 0;
1260
1261        if( atom->myflags & MYFLAG_BANNED )
1262        {
1263            tordbg( t, "banned peer %s tried to reconnect",
1264                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1265        }
1266        else if( tr_peerIoIsIncoming( io )
1267               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1268
1269        {
1270        }
1271        else
1272        {
1273            tr_peer * peer = getExistingPeer( t, addr );
1274
1275            if( peer ) /* we already have this peer */
1276            {
1277            }
1278            else
1279            {
1280                peer = getPeer( t, addr );
1281                tr_free( peer->client );
1282
1283                if( !peer_id )
1284                    peer->client = NULL;
1285                else {
1286                    char client[128];
1287                    tr_clientForId( client, sizeof( client ), peer_id );
1288                    peer->client = tr_strdup( client );
1289                }
1290
1291                peer->port = port;
1292                peer->io = tr_handshakeStealIO( handshake );
1293                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1294                tr_peerIoSetBandwidth( io, peer->bandwidth );
1295
1296                success = TRUE;
1297            }
1298        }
1299    }
1300
1301    if( !success )
1302        tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
1303
1304    if( t )
1305        torrentUnlock( t );
1306
1307    return success;
1308}
1309
1310void
1311tr_peerMgrAddIncoming( tr_peerMgr * manager,
1312                       tr_address * addr,
1313                       tr_port      port,
1314                       int          socket )
1315{
1316    managerLock( manager );
1317
1318    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1319    {
1320        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1321        tr_netClose( socket );
1322    }
1323    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1324    {
1325        tr_netClose( socket );
1326    }
1327    else /* we don't have a connetion to them yet... */
1328    {
1329        tr_peerIo *    io;
1330        tr_handshake * handshake;
1331
1332        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1333
1334        handshake = tr_handshakeNew( io,
1335                                     manager->session->encryptionMode,
1336                                     myHandshakeDoneCB,
1337                                     manager );
1338
1339        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1340                                 handshakeCompare );
1341    }
1342
1343    managerUnlock( manager );
1344}
1345
1346static int
1347tr_isPex( const tr_pex * pex )
1348{
1349    return pex && tr_isAddress( &pex->addr );
1350}
1351
1352void
1353tr_peerMgrAddPex( tr_peerMgr *    manager,
1354                  const uint8_t * torrentHash,
1355                  uint8_t         from,
1356                  const tr_pex *  pex )
1357{
1358    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1359    {
1360        Torrent * t;
1361        managerLock( manager );
1362
1363
1364        t = getExistingTorrent( manager, torrentHash );
1365        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1366            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1367   
1368        managerUnlock( manager );
1369    }
1370}
1371
1372tr_pex *
1373tr_peerMgrCompactToPex( const void *    compact,
1374                        size_t          compactLen,
1375                        const uint8_t * added_f,
1376                        size_t          added_f_len,
1377                        size_t *        pexCount )
1378{
1379    size_t          i;
1380    size_t          n = compactLen / 6;
1381    const uint8_t * walk = compact;
1382    tr_pex *        pex = tr_new0( tr_pex, n );
1383
1384    for( i = 0; i < n; ++i )
1385    {
1386        pex[i].addr.type = TR_AF_INET;
1387        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1388        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1389        if( added_f && ( n == added_f_len ) )
1390            pex[i].flags = added_f[i];
1391    }
1392
1393    *pexCount = n;
1394    return pex;
1395}
1396
1397tr_pex *
1398tr_peerMgrCompact6ToPex( const void    * compact,
1399                         size_t          compactLen,
1400                         const uint8_t * added_f,
1401                         size_t          added_f_len,
1402                         size_t        * pexCount )
1403{
1404    size_t          i;
1405    size_t          n = compactLen / 18;
1406    const uint8_t * walk = compact;
1407    tr_pex *        pex = tr_new0( tr_pex, n );
1408   
1409    for( i = 0; i < n; ++i )
1410    {
1411        pex[i].addr.type = TR_AF_INET6;
1412        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1413        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1414        if( added_f && ( n == added_f_len ) )
1415            pex[i].flags = added_f[i];
1416    }
1417   
1418    *pexCount = n;
1419    return pex;
1420}
1421
1422tr_pex *
1423tr_peerMgrArrayToPex( const void * array,
1424                      size_t       arrayLen,
1425                      size_t      * pexCount )
1426{
1427    size_t          i;
1428    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1429    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1430    const uint8_t * walk = array;
1431    tr_pex        * pex = tr_new0( tr_pex, n );
1432   
1433    for( i = 0 ; i < n ; i++ ) {
1434        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1435        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1436        pex[i].flags = 0x00;
1437        walk += sizeof( tr_address ) + 2;
1438    }
1439   
1440    *pexCount = n;
1441    return pex;
1442}
1443
1444/**
1445***
1446**/
1447
1448void
1449tr_peerMgrSetBlame( tr_peerMgr *     manager,
1450                    const uint8_t *  torrentHash,
1451                    tr_piece_index_t pieceIndex,
1452                    int              success )
1453{
1454    if( !success )
1455    {
1456        int        peerCount, i;
1457        Torrent *  t = getExistingTorrent( manager, torrentHash );
1458        tr_peer ** peers;
1459
1460        assert( torrentIsLocked( t ) );
1461
1462        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1463        for( i = 0; i < peerCount; ++i )
1464        {
1465            tr_peer * peer = peers[i];
1466            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1467            {
1468                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1469                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1470                        pieceIndex, (int)peer->strikes + 1 );
1471                addStrike( t, peer );
1472            }
1473        }
1474    }
1475}
1476
1477int
1478tr_pexCompare( const void * va, const void * vb )
1479{
1480    const tr_pex * a = va;
1481    const tr_pex * b = vb;
1482    int i;
1483
1484    assert( tr_isPex( a ) );
1485    assert( tr_isPex( b ) );
1486
1487    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1488        return i;
1489
1490    if( a->port != b->port )
1491        return a->port < b->port ? -1 : 1;
1492
1493    return 0;
1494}
1495
1496static int
1497peerPrefersCrypto( const tr_peer * peer )
1498{
1499    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1500        return TRUE;
1501
1502    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1503        return FALSE;
1504
1505    return tr_peerIoIsEncrypted( peer->io );
1506}
1507
1508int
1509tr_peerMgrGetPeers( tr_peerMgr      * manager,
1510                    const uint8_t   * torrentHash,
1511                    tr_pex         ** setme_pex,
1512                    uint8_t           af)
1513{
1514    int peerCount = 0;
1515    int peersReturning = 0;
1516    const Torrent *  t;
1517
1518    managerLock( manager );
1519
1520    t = getExistingTorrent( manager, torrentHash );
1521    if( t == NULL )
1522    {
1523        *setme_pex = NULL;
1524    }
1525    else
1526    {
1527        int i;
1528        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1529        /* for now, this will waste memory on torrents that have both
1530         * ipv6 and ipv4 peers */
1531        tr_pex * pex = tr_new( tr_pex, peerCount );
1532        tr_pex * walk = pex;
1533
1534        for( i=0; i<peerCount; ++i )
1535        {
1536            const tr_peer * peer = peers[i];
1537            if( peer->addr.type == af )
1538            {
1539                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1540                assert( tr_isAddress( &peer->addr ) );
1541                walk->addr = peer->addr;
1542                walk->port = peer->port;
1543                walk->flags = 0;
1544                if( peerPrefersCrypto( peer ) )
1545                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1546                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) ||
1547                    ( peer->progress >= 1.0 ) )
1548                    walk->flags |= ADDED_F_SEED_FLAG;
1549                ++peersReturning;
1550                ++walk;
1551            }
1552        }
1553
1554        assert( ( walk - pex ) == peersReturning );
1555        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1556
1557        *setme_pex = pex;
1558    }
1559
1560    managerUnlock( manager );
1561    return peersReturning;
1562}
1563
1564static int reconnectPulse( void * vtorrent );
1565
1566static int rechokePulse( void * vtorrent );
1567
1568void
1569tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1570                        const uint8_t * torrentHash )
1571{
1572    Torrent * t;
1573
1574    managerLock( manager );
1575
1576    t = getExistingTorrent( manager, torrentHash );
1577
1578    assert( t );
1579    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1580    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1581
1582    if( !t->isRunning )
1583    {
1584        t->isRunning = 1;
1585
1586        t->reconnectTimer = tr_timerNew( t->manager->session,
1587                                         reconnectPulse, t,
1588                                         RECONNECT_PERIOD_MSEC );
1589
1590        t->rechokeTimer = tr_timerNew( t->manager->session,
1591                                       rechokePulse, t,
1592                                       RECHOKE_PERIOD_MSEC );
1593
1594        reconnectPulse( t );
1595
1596        rechokePulse( t );
1597
1598        if( !tr_ptrArrayEmpty( t->webseeds ) )
1599            refillSoon( t );
1600    }
1601
1602    managerUnlock( manager );
1603}
1604
1605static void
1606stopTorrent( Torrent * t )
1607{
1608    assert( torrentIsLocked( t ) );
1609
1610    t->isRunning = 0;
1611    tr_timerFree( &t->rechokeTimer );
1612    tr_timerFree( &t->reconnectTimer );
1613
1614    /* disconnect the peers. */
1615    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1616    tr_ptrArrayClear( t->peers );
1617
1618    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1619     * which removes the handshake from t->outgoingHandshakes... */
1620    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1621        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1622}
1623
1624void
1625tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1626                       const uint8_t * torrentHash )
1627{
1628    managerLock( manager );
1629
1630    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1631
1632    managerUnlock( manager );
1633}
1634
1635void
1636tr_peerMgrAddTorrent( tr_peerMgr * manager,
1637                      tr_torrent * tor )
1638{
1639    Torrent * t;
1640
1641    managerLock( manager );
1642
1643    assert( tor );
1644    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1645
1646    t = torrentConstructor( manager, tor );
1647    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1648
1649    managerUnlock( manager );
1650}
1651
1652void
1653tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1654                         const uint8_t * torrentHash )
1655{
1656    Torrent * t;
1657
1658    managerLock( manager );
1659
1660    t = getExistingTorrent( manager, torrentHash );
1661    assert( t );
1662    stopTorrent( t );
1663    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1664    torrentDestructor( t );
1665
1666    managerUnlock( manager );
1667}
1668
1669void
1670tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1671                               const uint8_t *    torrentHash,
1672                               int8_t *           tab,
1673                               unsigned int       tabCount )
1674{
1675    tr_piece_index_t   i;
1676    const Torrent *    t;
1677    const tr_torrent * tor;
1678    float              interval;
1679    int                isSeed;
1680    int                peerCount;
1681    const tr_peer **   peers;
1682
1683    managerLock( manager );
1684
1685    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1686    tor = t->tor;
1687    interval = tor->info.pieceCount / (float)tabCount;
1688    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1689    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1690
1691    memset( tab, 0, tabCount );
1692
1693    for( i = 0; tor && i < tabCount; ++i )
1694    {
1695        const int piece = i * interval;
1696
1697        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1698            tab[i] = -1;
1699        else if( peerCount )
1700        {
1701            int j;
1702            for( j = 0; j < peerCount; ++j )
1703                if( tr_bitfieldHas( peers[j]->have, i ) )
1704                    ++tab[i];
1705        }
1706    }
1707
1708    managerUnlock( manager );
1709}
1710
1711/* Returns the pieces that are available from peers */
1712tr_bitfield*
1713tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1714                        const uint8_t *    torrentHash )
1715{
1716    int           i, size;
1717    Torrent *     t;
1718    tr_peer **    peers;
1719    tr_bitfield * pieces;
1720    managerLock( manager );
1721
1722    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1723    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1724    peers = getConnectedPeers( t, &size );
1725    for( i = 0; i < size; ++i )
1726        tr_bitfieldOr( pieces, peers[i]->have );
1727
1728    managerUnlock( manager );
1729    tr_free( peers );
1730    return pieces;
1731}
1732
1733int
1734tr_peerMgrHasConnections( const tr_peerMgr * manager,
1735                          const uint8_t *    torrentHash )
1736{
1737    int ret;
1738    const Torrent * t;
1739    managerLock( manager );
1740
1741    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1742    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1743
1744    managerUnlock( manager );
1745    return ret;
1746}
1747
1748void
1749tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1750                        const uint8_t    * torrentHash,
1751                        int              * setmePeersKnown,
1752                        int              * setmePeersConnected,
1753                        int              * setmeSeedsConnected,
1754                        int              * setmeWebseedsSendingToUs,
1755                        int              * setmePeersSendingToUs,
1756                        int              * setmePeersGettingFromUs,
1757                        int              * setmePeersFrom )
1758{
1759    int i, size;
1760    const Torrent * t;
1761    const tr_peer ** peers;
1762    const tr_webseed ** webseeds;
1763
1764    managerLock( manager );
1765
1766    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1767    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1768
1769    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1770    *setmePeersConnected       = 0;
1771    *setmeSeedsConnected       = 0;
1772    *setmePeersGettingFromUs   = 0;
1773    *setmePeersSendingToUs     = 0;
1774    *setmeWebseedsSendingToUs  = 0;
1775
1776    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1777        setmePeersFrom[i] = 0;
1778
1779    for( i=0; i<size; ++i )
1780    {
1781        const tr_peer * peer = peers[i];
1782        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1783
1784        if( peer->io == NULL ) /* not connected */
1785            continue;
1786
1787        ++*setmePeersConnected;
1788
1789        ++setmePeersFrom[atom->from];
1790
1791        if( clientIsDownloadingFrom( peer ) )
1792            ++*setmePeersSendingToUs;
1793
1794        if( clientIsUploadingTo( peer ) )
1795            ++*setmePeersGettingFromUs;
1796
1797        if( atom->flags & ADDED_F_SEED_FLAG )
1798            ++*setmeSeedsConnected;
1799    }
1800
1801    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1802    for( i=0; i<size; ++i )
1803    {
1804        if( tr_webseedIsActive( webseeds[i] ) )
1805            ++*setmeWebseedsSendingToUs;
1806    }
1807
1808    managerUnlock( manager );
1809}
1810
1811float*
1812tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1813                     const uint8_t *    torrentHash )
1814{
1815    const Torrent * t;
1816    const tr_webseed ** webseeds;
1817    int i;
1818    int webseedCount;
1819    float * ret;
1820
1821    assert( manager );
1822    managerLock( manager );
1823
1824    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1825    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1826                                                     &webseedCount );
1827    assert( webseedCount == t->tor->info.webseedCount );
1828    ret = tr_new0( float, webseedCount );
1829
1830    for( i=0; i<webseedCount; ++i )
1831        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1832            ret[i] = -1.0;
1833
1834    managerUnlock( manager );
1835    return ret;
1836}
1837
1838double
1839tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1840{
1841    assert( peer );
1842    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1843
1844    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1845}
1846
1847
1848struct tr_peer_stat *
1849tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1850                     const   uint8_t     * torrentHash,
1851                     int                 * setmeCount UNUSED )
1852{
1853    int             i, size;
1854    const Torrent * t;
1855    tr_peer **      peers;
1856    tr_peer_stat *  ret;
1857
1858    assert( manager );
1859    managerLock( manager );
1860
1861    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1862    peers = getConnectedPeers( (Torrent*)t, &size );
1863    ret = tr_new0( tr_peer_stat, size );
1864
1865    for( i = 0; i < size; ++i )
1866    {
1867        char *                   pch;
1868        const tr_peer *          peer = peers[i];
1869        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1870        tr_peer_stat *           stat = ret + i;
1871        tr_address               norm_addr;
1872
1873        memcpy( &norm_addr, &peer->addr, sizeof( tr_address ) );
1874        tr_normalizeV4Mapped( &norm_addr );
1875        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1876        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1877                   sizeof( stat->client ) );
1878        stat->port               = ntohs( peer->port );
1879        stat->from               = atom->from;
1880        stat->progress           = peer->progress;
1881        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1882        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1883        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1884        stat->peerIsChoked       = peer->peerIsChoked;
1885        stat->peerIsInterested   = peer->peerIsInterested;
1886        stat->clientIsChoked     = peer->clientIsChoked;
1887        stat->clientIsInterested = peer->clientIsInterested;
1888        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1889        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1890        stat->isUploadingTo      = clientIsUploadingTo( peer );
1891        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1892
1893        pch = stat->flagStr;
1894        if( t->optimistic == peer ) *pch++ = 'O';
1895        if( stat->isDownloadingFrom ) *pch++ = 'D';
1896        else if( stat->clientIsInterested ) *pch++ = 'd';
1897        if( stat->isUploadingTo ) *pch++ = 'U';
1898        else if( stat->peerIsInterested ) *pch++ = 'u';
1899        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1900        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1901        if( stat->isEncrypted ) *pch++ = 'E';
1902        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1903        if( stat->isIncoming ) *pch++ = 'I';
1904        *pch = '\0';
1905    }
1906
1907    *setmeCount = size;
1908    tr_free( peers );
1909
1910    managerUnlock( manager );
1911    return ret;
1912}
1913
1914/**
1915***
1916**/
1917
1918struct ChokeData
1919{
1920    tr_bool         doUnchoke;
1921    tr_bool         isInterested;
1922    tr_bool         isChoked;
1923    int             rate;
1924    tr_peer *       peer;
1925};
1926
1927static int
1928compareChoke( const void * va,
1929              const void * vb )
1930{
1931    const struct ChokeData * a = va;
1932    const struct ChokeData * b = vb;
1933
1934    if( a->rate != b->rate ) /* prefer higher overall speeds */
1935        return a->rate > b->rate ? -1 : 1;
1936
1937    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1938        return a->isChoked ? 1 : -1;
1939
1940    return 0;
1941}
1942
1943static int
1944isNew( const tr_peer * peer )
1945{
1946    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1947}
1948
1949static int
1950isSame( const tr_peer * peer )
1951{
1952    return peer && peer->client && strstr( peer->client, "Transmission" );
1953}
1954
1955/**
1956***
1957**/
1958
1959static void
1960rechoke( Torrent * t )
1961{
1962    int                i, peerCount, size, unchokedInterested;
1963    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1964    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1965    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1966
1967    assert( torrentIsLocked( t ) );
1968
1969    /* sort the peers by preference and rate */
1970    for( i = 0, size = 0; i < peerCount; ++i )
1971    {
1972        tr_peer * peer = peers[i];
1973        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1974
1975        if( peer->progress >= 1.0 ) /* choke all seeds */
1976        {
1977            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1978        }
1979        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1980        {
1981            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1982        }
1983        else if( chokeAll ) /* choke everyone if we're not uploading */
1984        {
1985            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1986        }
1987        else
1988        {
1989            struct ChokeData * n = &choke[size++];
1990            n->peer         = peer;
1991            n->isInterested = peer->peerIsInterested;
1992            n->isChoked     = peer->peerIsChoked;
1993            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1994        }
1995    }
1996
1997    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1998
1999    /**
2000     * Reciprocation and number of uploads capping is managed by unchoking
2001     * the N peers which have the best upload rate and are interested.
2002     * This maximizes the client's download rate. These N peers are
2003     * referred to as downloaders, because they are interested in downloading
2004     * from the client.
2005     *
2006     * Peers which have a better upload rate (as compared to the downloaders)
2007     * but aren't interested get unchoked. If they become interested, the
2008     * downloader with the worst upload rate gets choked. If a client has
2009     * a complete file, it uses its upload rate rather than its download
2010     * rate to decide which peers to unchoke.
2011     */
2012    unchokedInterested = 0;
2013    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2014        choke[i].doUnchoke = 1;
2015        if( choke[i].isInterested )
2016            ++unchokedInterested;
2017    }
2018
2019    /* optimistic unchoke */
2020    if( i < size )
2021    {
2022        int n;
2023        struct ChokeData * c;
2024        tr_ptrArray * randPool = tr_ptrArrayNew( );
2025
2026        for( ; i<size; ++i )
2027        {
2028            if( choke[i].isInterested )
2029            {
2030                const tr_peer * peer = choke[i].peer;
2031                int x = 1, y;
2032                if( isNew( peer ) ) x *= 3;
2033                if( isSame( peer ) ) x *= 3;
2034                for( y=0; y<x; ++y )
2035                    tr_ptrArrayAppend( randPool, &choke[i] );
2036            }
2037        }
2038
2039        if(( n = tr_ptrArraySize( randPool )))
2040        {
2041            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
2042            c->doUnchoke = 1;
2043            t->optimistic = c->peer;
2044        }
2045
2046        tr_ptrArrayFree( randPool, NULL );
2047    }
2048
2049    for( i = 0; i < size; ++i )
2050        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2051
2052    /* cleanup */
2053    tr_free( choke );
2054    tr_free( peers );
2055}
2056
2057static int
2058rechokePulse( void * vtorrent )
2059{
2060    Torrent * t = vtorrent;
2061
2062    torrentLock( t );
2063    rechoke( t );
2064    torrentUnlock( t );
2065    return TRUE;
2066}
2067
2068/***
2069****
2070****  Life and Death
2071****
2072***/
2073
2074static int
2075shouldPeerBeClosed( const Torrent * t,
2076                    const tr_peer * peer,
2077                    int             peerCount )
2078{
2079    const tr_torrent *       tor = t->tor;
2080    const time_t             now = time( NULL );
2081    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2082
2083    /* if it's marked for purging, close it */
2084    if( peer->doPurge )
2085    {
2086        tordbg( t, "purging peer %s because its doPurge flag is set",
2087                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2088        return TRUE;
2089    }
2090
2091    /* if we're seeding and the peer has everything we have,
2092     * and enough time has passed for a pex exchange, then disconnect */
2093    if( tr_torrentIsSeed( tor ) )
2094    {
2095        int peerHasEverything;
2096        if( atom->flags & ADDED_F_SEED_FLAG )
2097            peerHasEverything = TRUE;
2098        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2099            peerHasEverything = FALSE;
2100        else {
2101            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2102            tr_bitfieldDifference( tmp, peer->have );
2103            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2104            tr_bitfieldFree( tmp );
2105        }
2106        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2107            tordbg( t, "purging peer %s because we're both seeds",
2108                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2109            return TRUE;
2110        }
2111    }
2112
2113    /* disconnect if it's been too long since piece data has been transferred.
2114     * this is on a sliding scale based on number of available peers... */
2115    {
2116        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2117        /* if we have >= relaxIfFewerThan, strictness is 100%.
2118         * if we have zero connections, strictness is 0% */
2119        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2120            ? 1.0
2121            : peerCount / (float)relaxStrictnessIfFewerThanN;
2122        const int lo = MIN_UPLOAD_IDLE_SECS;
2123        const int hi = MAX_UPLOAD_IDLE_SECS;
2124        const int limit = hi - ( ( hi - lo ) * strictness );
2125        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2126/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2127        if( idleTime > limit ) {
2128            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2129                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2130            return TRUE;
2131        }
2132    }
2133
2134    return FALSE;
2135}
2136
2137static tr_peer **
2138getPeersToClose( Torrent * t, int * setmeSize )
2139{
2140    int i, peerCount, outsize;
2141    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2142    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2143
2144    assert( torrentIsLocked( t ) );
2145
2146    for( i = outsize = 0; i < peerCount; ++i )
2147        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2148            ret[outsize++] = peers[i];
2149
2150    *setmeSize = outsize;
2151    return ret;
2152}
2153
2154static int
2155compareCandidates( const void * va,
2156                   const void * vb )
2157{
2158    const struct peer_atom * a = *(const struct peer_atom**) va;
2159    const struct peer_atom * b = *(const struct peer_atom**) vb;
2160
2161    /* <Charles> Here we would probably want to try reconnecting to
2162     * peers that had most recently given us data. Lots of users have
2163     * trouble with resets due to their routers and/or ISPs. This way we
2164     * can quickly recover from an unwanted reset. So we sort
2165     * piece_data_time in descending order.
2166     */
2167
2168    if( a->piece_data_time != b->piece_data_time )
2169        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2170
2171    if( a->numFails != b->numFails )
2172        return a->numFails < b->numFails ? -1 : 1;
2173
2174    if( a->time != b->time )
2175        return a->time < b->time ? -1 : 1;
2176
2177    /* all other things being equal, prefer peers whose
2178     * information comes from a more reliable source */
2179    if( a->from != b->from )
2180        return a->from < b->from ? -1 : 1;
2181
2182    return 0;
2183}
2184
2185static int
2186getReconnectIntervalSecs( const struct peer_atom * atom )
2187{
2188    int          sec;
2189    const time_t now = time( NULL );
2190
2191    /* if we were recently connected to this peer and transferring piece
2192     * data, try to reconnect to them sooner rather that later -- we don't
2193     * want network troubles to get in the way of a good peer. */
2194    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2195        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2196
2197    /* don't allow reconnects more often than our minimum */
2198    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2199        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2200
2201    /* otherwise, the interval depends on how many times we've tried
2202     * and failed to connect to the peer */
2203    else switch( atom->numFails ) {
2204        case 0: sec = 0; break;
2205        case 1: sec = 5; break;
2206        case 2: sec = 2 * 60; break;
2207        case 3: sec = 15 * 60; break;
2208        case 4: sec = 30 * 60; break;
2209        case 5: sec = 60 * 60; break;
2210        default: sec = 120 * 60; break;
2211    }
2212
2213    return sec;
2214}
2215
2216static struct peer_atom **
2217getPeerCandidates( Torrent * t, int * setmeSize )
2218{
2219    int                 i, atomCount, retCount;
2220    struct peer_atom ** atoms;
2221    struct peer_atom ** ret;
2222    const time_t        now = time( NULL );
2223    const int           seed = tr_torrentIsSeed( t->tor );
2224
2225    assert( torrentIsLocked( t ) );
2226
2227    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2228    ret = tr_new( struct peer_atom*, atomCount );
2229    for( i = retCount = 0; i < atomCount; ++i )
2230    {
2231        int                interval;
2232        struct peer_atom * atom = atoms[i];
2233
2234        /* peer fed us too much bad data ... we only keep it around
2235         * now to weed it out in case someone sends it to us via pex */
2236        if( atom->myflags & MYFLAG_BANNED )
2237            continue;
2238
2239        /* peer was unconnectable before, so we're not going to keep trying.
2240         * this is needs a separate flag from `banned', since if they try
2241         * to connect to us later, we'll let them in */
2242        if( atom->myflags & MYFLAG_UNREACHABLE )
2243            continue;
2244
2245        /* we don't need two connections to the same peer... */
2246        if( peerIsInUse( t, &atom->addr ) )
2247            continue;
2248
2249        /* no need to connect if we're both seeds... */
2250        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2251                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2252            continue;
2253
2254        /* don't reconnect too often */
2255        interval = getReconnectIntervalSecs( atom );
2256        if( ( now - atom->time ) < interval )
2257        {
2258            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2259                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2260            continue;
2261        }
2262
2263        /* Don't connect to peers in our blocklist */
2264        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2265            continue;
2266
2267        ret[retCount++] = atom;
2268    }
2269
2270    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2271    *setmeSize = retCount;
2272    return ret;
2273}
2274
2275static int
2276reconnectPulse( void * vtorrent )
2277{
2278    Torrent *     t = vtorrent;
2279    static time_t prevTime = 0;
2280    static int    newConnectionsThisSecond = 0;
2281    time_t        now;
2282
2283    torrentLock( t );
2284
2285    now = time( NULL );
2286    if( prevTime != now )
2287    {
2288        prevTime = now;
2289        newConnectionsThisSecond = 0;
2290    }
2291
2292    if( !t->isRunning )
2293    {
2294        removeAllPeers( t );
2295    }
2296    else
2297    {
2298        int i, nCandidates, nBad;
2299        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2300        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2301
2302        if( nBad || nCandidates )
2303            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2304                   "%d connection candidates, %d atoms, max per pulse is %d",
2305                   t->tor->info.name, nBad, nCandidates,
2306                   tr_ptrArraySize( t->pool ),
2307                   (int)MAX_RECONNECTIONS_PER_PULSE );
2308
2309        /* disconnect some peers.
2310           if we transferred piece data, then they might be good peers,
2311           so reset their `numFails' weight to zero.  otherwise we connected
2312           to them fruitlessly, so mark it as another fail */
2313        for( i = 0; i < nBad; ++i ) {
2314            tr_peer * peer = connections[i];
2315            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2316            if( atom->piece_data_time )
2317                atom->numFails = 0;
2318            else
2319                ++atom->numFails;
2320             
2321            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2322            removePeer( t, peer );
2323        }
2324
2325        /* add some new ones */
2326        for( i = 0;    ( i < nCandidates )
2327           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2328           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2329           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2330        {
2331            tr_peerMgr *       mgr = t->manager;
2332            struct peer_atom * atom = candidates[i];
2333            tr_peerIo *        io;
2334
2335            tordbg( t, "Starting an OUTGOING connection with %s",
2336                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2337
2338            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2339            if( io == NULL )
2340            {
2341                atom->myflags |= MYFLAG_UNREACHABLE;
2342            }
2343            else
2344            {
2345                tr_handshake * handshake = tr_handshakeNew( io,
2346                                                            mgr->session->encryptionMode,
2347                                                            myHandshakeDoneCB,
2348                                                            mgr );
2349
2350                assert( tr_peerIoGetTorrentHash( io ) );
2351
2352                ++newConnectionsThisSecond;
2353
2354                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2355                                         handshakeCompare );
2356            }
2357
2358            atom->time = time( NULL );
2359        }
2360
2361        /* cleanup */
2362        tr_free( connections );
2363        tr_free( candidates );
2364    }
2365
2366    torrentUnlock( t );
2367    return TRUE;
2368}
2369
2370/****
2371*****
2372*****  BANDWIDTH ALLOCATION
2373*****
2374****/
2375
2376static void
2377pumpAllPeers( tr_peerMgr * mgr )
2378{
2379    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2380    int       i, j;
2381
2382    for( i=0; i<torrentCount; ++i )
2383    {
2384        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2385        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2386        {
2387            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2388            tr_peerMsgsPulse( peer->msgs );
2389        }
2390    }
2391}
2392
2393static int
2394bandwidthPulse( void * vmgr )
2395{
2396    tr_handshake * handshake;
2397    tr_peerMgr * mgr = vmgr;
2398    managerLock( mgr );
2399
2400    /* FIXME: this next line probably isn't necessary... */
2401    pumpAllPeers( mgr );
2402
2403    /* allocate bandwidth to the peers */
2404    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2405    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2406
2407    /* free all the finished handshakes */
2408    while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
2409        tr_handshakeFree( handshake );
2410
2411    managerUnlock( mgr );
2412    return TRUE;
2413}
Note: See TracBrowser for help on using the repository browser.