source: trunk/libtransmission/peer-mgr.c @ 7448

Last change on this file since 7448 was 7448, checked in by jhujhiti, 12 years ago

add some debugging stuff to track down where some bogus addresses are coming from

  • Property svn:keywords set to Date Rev Author Id
File size: 68.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7448 2008-12-21 19:13:52Z jhujhiti $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 500,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/**
100 * Peer information that should be kept even before we've connected and
101 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
102 * which ones would make good candidates for connecting to, and to watch out
103 * for banned peers.
104 *
105 * @see tr_peer
106 * @see tr_peermsgs
107 */
108struct peer_atom
109{
110    uint8_t     from;
111    uint8_t     flags;       /* these match the added_f flags */
112    uint8_t     myflags;     /* flags that aren't defined in added_f */
113    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
114    tr_port     port;
115    uint16_t    numFails;
116    tr_address  addr;
117    time_t      time;        /* when the peer's connection status last changed */
118    time_t      piece_data_time;
119};
120
121typedef struct
122{
123    tr_bool         isRunning;
124
125    uint8_t         hash[SHA_DIGEST_LENGTH];
126    int         *   pendingRequestCount;
127    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
128    tr_ptrArray *   pool; /* struct peer_atom */
129    tr_ptrArray *   peers; /* tr_peer */
130    tr_ptrArray *   webseeds; /* tr_webseed */
131    tr_timer *      reconnectTimer;
132    tr_timer *      rechokeTimer;
133    tr_timer *      refillTimer;
134    tr_torrent *    tor;
135    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
136
137    struct tr_peerMgr * manager;
138}
139Torrent;
140
141struct tr_peerMgr
142{
143    tr_session      * session;
144    tr_ptrArray     * torrents; /* Torrent */
145    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
146    tr_ptrArray     * finishedHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va,
202                        const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a,
211                  const void * b )
212{
213    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
214}
215
216static tr_handshake*
217getExistingHandshake( tr_ptrArray      * handshakes,
218                      const tr_address * addr )
219{
220    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
221}
222
223static int
224comparePeerAtomToAddress( const void * va, const void * vb )
225{
226    const struct peer_atom * a = va;
227
228    return tr_compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va, const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va, const void * vb )
274{
275    const tr_peer * a = va;
276    const tr_peer * b = vb;
277
278    return tr_compareAddresses( &a->addr, &b->addr );
279}
280
281static int
282peerCompareToAddr( const void * va, const void * vb )
283{
284    const tr_peer * a = va;
285
286    return tr_compareAddresses( &a->addr, vb );
287}
288
289static tr_peer*
290getExistingPeer( Torrent          * torrent,
291                 const tr_address * addr )
292{
293    assert( torrentIsLocked( torrent ) );
294    assert( addr );
295
296    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
297}
298
299static struct peer_atom*
300getExistingAtom( const Torrent    * t,
301                 const tr_address * addr )
302{
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( t->outgoingHandshakes, addr )
317        || getExistingHandshake( t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( tr_torrent * tor, const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
327    return p;
328}
329
330static tr_peer*
331getPeer( Torrent          * torrent,
332         const tr_address * addr )
333{
334    tr_peer * peer;
335
336    assert( torrentIsLocked( torrent ) );
337
338    peer = getExistingPeer( torrent, addr );
339
340    if( peer == NULL )
341    {
342        peer = peerConstructor( torrent->tor, addr );
343        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
344    }
345
346    return peer;
347}
348
349static void
350peerDestructor( tr_peer * peer )
351{
352    assert( peer );
353
354    if( peer->msgs != NULL )
355    {
356        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
357        tr_peerMsgsFree( peer->msgs );
358    }
359
360    tr_peerIoFree( peer->io );
361
362    tr_bitfieldFree( peer->have );
363    tr_bitfieldFree( peer->blame );
364    tr_free( peer->client );
365
366    tr_bandwidthFree( peer->bandwidth );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t,
373            tr_peer * peer )
374{
375    tr_peer *          removed;
376    struct peer_atom * atom;
377
378    assert( torrentIsLocked( t ) );
379
380    atom = getExistingAtom( t, &peer->addr );
381    assert( atom );
382    atom->time = time( NULL );
383
384    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
385    assert( removed == peer );
386    peerDestructor( removed );
387}
388
389static void
390removeAllPeers( Torrent * t )
391{
392    while( !tr_ptrArrayEmpty( t->peers ) )
393        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
394}
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400    uint8_t   hash[SHA_DIGEST_LENGTH];
401
402    assert( t );
403    assert( !t->isRunning );
404    assert( t->peers );
405    assert( torrentIsLocked( t ) );
406    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
407    assert( tr_ptrArrayEmpty( t->peers ) );
408
409    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
410
411    tr_timerFree( &t->reconnectTimer );
412    tr_timerFree( &t->rechokeTimer );
413    tr_timerFree( &t->refillTimer );
414
415    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
416    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
417    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
418    tr_ptrArrayFree( t->peers, NULL );
419
420    tr_free( t->pendingRequestCount );
421    tr_free( t );
422}
423
424static void peerCallbackFunc( void * vpeer,
425                              void * vevent,
426                              void * vt );
427
428static Torrent*
429torrentConstructor( tr_peerMgr * manager,
430                    tr_torrent * tor )
431{
432    int       i;
433    Torrent * t;
434
435    t = tr_new0( Torrent, 1 );
436    t->manager = manager;
437    t->tor = tor;
438    t->pool = tr_ptrArrayNew( );
439    t->peers = tr_ptrArrayNew( );
440    t->webseeds = tr_ptrArrayNew( );
441    t->outgoingHandshakes = tr_ptrArrayNew( );
442    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
443
444    for( i = 0; i < tor->info.webseedCount; ++i )
445    {
446        tr_webseed * w =
447            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
448                           t );
449        tr_ptrArrayAppend( t->webseeds, w );
450    }
451
452    return t;
453}
454
455
456static int bandwidthPulse( void * vmgr );
457
458
459tr_peerMgr*
460tr_peerMgrNew( tr_session * session )
461{
462    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
463
464    m->session = session;
465    m->torrents = tr_ptrArrayNew( );
466    m->incomingHandshakes = tr_ptrArrayNew( );
467    m->finishedHandshakes = tr_ptrArrayNew( );
468    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
469    return m;
470}
471
472void
473tr_peerMgrFree( tr_peerMgr * manager )
474{
475    tr_handshake * handshake;
476
477    managerLock( manager );
478
479    tr_timerFree( &manager->bandwidthTimer );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
485
486    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
487
488    while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
489        tr_handshakeFree( handshake );
490
491    tr_ptrArrayFree( manager->finishedHandshakes, NULL );
492
493    /* free the torrents. */
494    tr_ptrArrayFree( manager->torrents, torrentDestructor );
495
496    managerUnlock( manager );
497    tr_free( manager );
498}
499
500static tr_peer**
501getConnectedPeers( Torrent * t, int * setmeCount )
502{
503    int i, peerCount, connectionCount;
504    tr_peer **peers;
505    tr_peer **ret;
506
507    assert( torrentIsLocked( t ) );
508
509    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
510    ret = tr_new( tr_peer *, peerCount );
511
512    for( i = connectionCount = 0; i < peerCount; ++i )
513        if( peers[i]->msgs )
514            ret[connectionCount++] = peers[i];
515
516    *setmeCount = connectionCount;
517    return ret;
518}
519
520static int
521clientIsDownloadingFrom( const tr_peer * peer )
522{
523    return peer->clientIsInterested && !peer->clientIsChoked;
524}
525
526static int
527clientIsUploadingTo( const tr_peer * peer )
528{
529    return peer->peerIsInterested && !peer->peerIsChoked;
530}
531
532/***
533****
534***/
535
536tr_bool
537tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
538                      const uint8_t      * torrentHash,
539                      const tr_address   * addr )
540{
541    tr_bool isSeed = FALSE;
542    const Torrent * t = NULL;
543    const struct peer_atom * atom = NULL;
544
545    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
546    if( t )
547        atom = getExistingAtom( t, addr );
548    if( atom )
549        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
550
551    return isSeed;
552}
553
554/****
555*****
556*****  REFILL
557*****
558****/
559
560static void
561assertValidPiece( Torrent * t, tr_piece_index_t piece )
562{
563    assert( t );
564    assert( t->tor );
565    assert( piece < t->tor->info.pieceCount );
566}
567
568static int
569getPieceRequests( Torrent * t, tr_piece_index_t piece )
570{
571    assertValidPiece( t, piece );
572
573    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
574}
575
576static void
577incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
578{
579    assertValidPiece( t, piece );
580
581    if( t->pendingRequestCount == NULL )
582        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
583    t->pendingRequestCount[piece]++;
584}
585
586static void
587decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
588{
589    assertValidPiece( t, piece );
590
591    if( t->pendingRequestCount )
592        t->pendingRequestCount[piece]--;
593}
594
595struct tr_refill_piece
596{
597    tr_priority_t    priority;
598    uint32_t         piece;
599    uint32_t         peerCount;
600    int              random;
601    int              pendingRequestCount;
602    int              missingBlockCount;
603};
604
605static int
606compareRefillPiece( const void * aIn, const void * bIn )
607{
608    const struct tr_refill_piece * a = aIn;
609    const struct tr_refill_piece * b = bIn;
610
611    /* if one piece has a higher priority, it goes first */
612    if( a->priority != b->priority )
613        return a->priority > b->priority ? -1 : 1;
614
615    /* have a per-priority endgame */
616    if( a->pendingRequestCount != b->pendingRequestCount )
617        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
618
619    /* fewer missing pieces goes first */
620    if( a->missingBlockCount != b->missingBlockCount )
621        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
622
623    /* otherwise if one has fewer peers, it goes first */
624    if( a->peerCount != b->peerCount )
625        return a->peerCount < b->peerCount ? -1 : 1;
626
627    /* otherwise go with our random seed */
628    if( a->random != b->random )
629        return a->random < b->random ? -1 : 1;
630
631    return 0;
632}
633
634static tr_piece_index_t *
635getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
636{
637    const tr_torrent  * tor = t->tor;
638    const tr_info     * inf = &tor->info;
639    tr_piece_index_t    i;
640    tr_piece_index_t    poolSize = 0;
641    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
642    int                 peerCount;
643    tr_peer**           peers;
644
645    assert( torrentIsLocked( t ) );
646
647    peers = getConnectedPeers( t, &peerCount );
648
649    /* make a list of the pieces that we want but don't have */
650    for( i = 0; i < inf->pieceCount; ++i )
651        if( !tor->info.pieces[i].dnd
652                && !tr_cpPieceIsComplete( tor->completion, i ) )
653            pool[poolSize++] = i;
654
655    /* sort the pool by which to request next */
656    if( poolSize > 1 )
657    {
658        tr_piece_index_t j;
659        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
660
661        for( j = 0; j < poolSize; ++j )
662        {
663            int k;
664            const tr_piece_index_t piece = pool[j];
665            struct tr_refill_piece * setme = p + j;
666
667            setme->piece = piece;
668            setme->priority = inf->pieces[piece].priority;
669            setme->peerCount = 0;
670            setme->random = tr_cryptoWeakRandInt( INT_MAX );
671            setme->pendingRequestCount = getPieceRequests( t, piece );
672            setme->missingBlockCount
673                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
674
675            for( k = 0; k < peerCount; ++k )
676            {
677                const tr_peer * peer = peers[k];
678                if( peer->peerIsInterested
679                        && !peer->clientIsChoked
680                        && tr_bitfieldHas( peer->have, piece ) )
681                    ++setme->peerCount;
682            }
683        }
684
685        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
686               compareRefillPiece );
687
688        for( j = 0; j < poolSize; ++j )
689            pool[j] = p[j].piece;
690
691        tr_free( p );
692    }
693
694    tr_free( peers );
695
696    *pieceCount = poolSize;
697    return pool;
698}
699
700struct tr_blockIterator
701{
702    Torrent * t;
703    tr_block_index_t blockIndex, blockCount, *blocks;
704    tr_piece_index_t pieceIndex, pieceCount, *pieces;
705};
706
707static struct tr_blockIterator*
708blockIteratorNew( Torrent * t )
709{
710    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
711    i->t = t;
712    i->pieces = getPreferredPieces( t, &i->pieceCount );
713    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
714    return i;
715}
716
717static int
718blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
719{
720    int found;
721    Torrent * t = i->t;
722    tr_torrent * tor = t->tor;
723
724    while( ( i->blockIndex == i->blockCount )
725        && ( i->pieceIndex < i->pieceCount ) )
726    {
727        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
728        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
729        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
730        tr_block_index_t block;
731
732        assert( index < tor->info.pieceCount );
733
734        i->blockCount = 0;
735        i->blockIndex = 0;
736        for( block=b; block!=e; ++block )
737            if( !tr_cpBlockIsComplete( tor->completion, block ) )
738                i->blocks[i->blockCount++] = block;
739    }
740
741    if(( found = ( i->blockIndex < i->blockCount )))
742        *setme = i->blocks[i->blockIndex++];
743
744    return found;
745}
746
747static void
748blockIteratorFree( struct tr_blockIterator * i )
749{
750    tr_free( i->blocks );
751    tr_free( i->pieces );
752    tr_free( i );
753}
754
755static tr_peer**
756getPeersUploadingToClient( Torrent * t,
757                           int *     setmeCount )
758{
759    int j;
760    int peerCount = 0;
761    int retCount = 0;
762    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
763    tr_peer ** ret = tr_new( tr_peer *, peerCount );
764
765    j = 0; /* this is a temporary test to make sure we walk through all the peers */
766    if( peerCount )
767    {
768        /* Get a list of peers we're downloading from.
769           Pick a different starting point each time so all peers
770           get a chance at being the first in line */
771        const int fencepost = tr_cryptoWeakRandInt( peerCount );
772        int i = fencepost;
773        do {
774            if( clientIsDownloadingFrom( peers[i] ) )
775                ret[retCount++] = peers[i];
776            i = ( i + 1 ) % peerCount;
777            ++j;
778        } while( i != fencepost );
779    }
780    assert( j == peerCount );
781    *setmeCount = retCount;
782    return ret;
783}
784
785static uint32_t
786getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
787{
788    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
789    const uint64_t blockPos = tor->blockSize * b;
790    assert( blockPos >= piecePos );
791    return (uint32_t)( blockPos - piecePos );
792}
793
794static int
795refillPulse( void * vtorrent )
796{
797    tr_block_index_t block;
798    int peerCount;
799    int webseedCount;
800    tr_peer ** peers;
801    tr_webseed ** webseeds;
802    struct tr_blockIterator * blockIterator;
803    Torrent * t = vtorrent;
804    tr_torrent * tor = t->tor;
805
806    if( !t->isRunning )
807        return TRUE;
808    if( tr_torrentIsSeed( t->tor ) )
809        return TRUE;
810
811    torrentLock( t );
812    tordbg( t, "Refilling Request Buffers..." );
813
814    blockIterator = blockIteratorNew( t );
815    peers = getPeersUploadingToClient( t, &peerCount );
816    webseedCount = tr_ptrArraySize( t->webseeds );
817    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
818                          webseedCount * sizeof( tr_webseed* ) );
819
820    while( ( webseedCount || peerCount )
821        && blockIteratorNext( blockIterator, &block ) )
822    {
823        int j;
824        int handled = FALSE;
825
826        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
827        const uint32_t offset = getBlockOffsetInPiece( tor, block );
828        const uint32_t length = tr_torBlockCountBytes( tor, block );
829
830        /* find a peer who can ask for this block */
831        for( j=0; !handled && j<peerCount; )
832        {
833            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
834            switch( val )
835            {
836                case TR_ADDREQ_FULL:
837                case TR_ADDREQ_CLIENT_CHOKED:
838                    peers[j] = peers[--peerCount];
839                    break;
840
841                case TR_ADDREQ_MISSING:
842                case TR_ADDREQ_DUPLICATE:
843                    ++j;
844                    break;
845
846                case TR_ADDREQ_OK:
847                    incrementPieceRequests( t, index );
848                    handled = TRUE;
849                    break;
850
851                default:
852                    assert( 0 && "unhandled value" );
853                    break;
854            }
855        }
856
857        /* maybe one of the webseeds can do it */
858        for( j=0; !handled && j<webseedCount; )
859        {
860            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
861                                                          index, offset, length );
862            switch( val )
863            {
864                case TR_ADDREQ_FULL:
865                    webseeds[j] = webseeds[--webseedCount];
866                    break;
867
868                case TR_ADDREQ_OK:
869                    incrementPieceRequests( t, index );
870                    handled = TRUE;
871                    break;
872
873                default:
874                    assert( 0 && "unhandled value" );
875                    break;
876            }
877        }
878    }
879
880    /* cleanup */
881    blockIteratorFree( blockIterator );
882    tr_free( webseeds );
883    tr_free( peers );
884
885    t->refillTimer = NULL;
886    torrentUnlock( t );
887    return FALSE;
888}
889
890static void
891broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
892{
893    int i, size;
894    tr_peer ** peers;
895
896    assert( torrentIsLocked( t ) );
897
898    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
899    peers = getConnectedPeers( t, &size );
900    for( i=0; i<size; ++i )
901        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
902    tr_free( peers );
903}
904
905static void
906addStrike( Torrent * t,
907           tr_peer * peer )
908{
909    tordbg( t, "increasing peer %s strike count to %d",
910            tr_peerIoAddrStr( &peer->addr,
911                              peer->port ), peer->strikes + 1 );
912
913    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
914    {
915        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
916        atom->myflags |= MYFLAG_BANNED;
917        peer->doPurge = 1;
918        tordbg( t, "banning peer %s",
919               tr_peerIoAddrStr( &atom->addr, atom->port ) );
920    }
921}
922
923static void
924gotBadPiece( Torrent *        t,
925             tr_piece_index_t pieceIndex )
926{
927    tr_torrent *   tor = t->tor;
928    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
929
930    tor->corruptCur += byteCount;
931    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
932}
933
934static void
935refillSoon( Torrent * t )
936{
937    if( t->refillTimer == NULL )
938        t->refillTimer = tr_timerNew( t->manager->session,
939                                      refillPulse, t,
940                                      REFILL_PERIOD_MSEC );
941}
942
943static void
944peerSuggestedPiece( Torrent            * t,
945                    tr_peer            * peer,
946                    tr_piece_index_t     pieceIndex,
947                    int                  isFastAllowed )
948{
949    assert( t );
950    assert( peer );
951    assert( peer->msgs );
952
953    /* is this a valid piece? */
954    if(  pieceIndex >= t->tor->info.pieceCount )
955        return;
956
957    /* don't ask for it if we've already got it */
958    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
959        return;
960
961    /* don't ask for it if they don't have it */
962    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
963        return;
964
965    /* don't ask for it if we're choked and it's not fast */
966    if( !isFastAllowed && peer->clientIsChoked )
967        return;
968
969    /* request the blocks that we don't have in this piece */
970    {
971        tr_block_index_t block;
972        const tr_torrent * tor = t->tor;
973        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
974        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
975
976        for( block=start; block<end; ++block )
977        {
978            if( !tr_cpBlockIsComplete( tor->completion, block ) )
979            {
980                const uint32_t offset = getBlockOffsetInPiece( tor, block );
981                const uint32_t length = tr_torBlockCountBytes( tor, block );
982                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
983                incrementPieceRequests( t, pieceIndex );
984            }
985        }
986    }
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent             * t = vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_UPLOAD_ONLY:
1003            /* update our atom */
1004            if( peer ) {
1005                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1006                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1007            }
1008            break;
1009
1010        case TR_PEER_NEED_REQ:
1011            refillSoon( t );
1012            break;
1013
1014        case TR_PEER_CANCEL:
1015            decrementPieceRequests( t, e->pieceIndex );
1016            break;
1017
1018        case TR_PEER_PEER_GOT_DATA:
1019        {
1020            const time_t now = time( NULL );
1021            tr_torrent * tor = t->tor;
1022
1023            tor->activityDate = now;
1024
1025            if( e->wasPieceData )
1026                tor->uploadedCur += e->length;
1027
1028            /* update the stats */
1029            if( e->wasPieceData )
1030                tr_statsAddUploaded( tor->session, e->length );
1031
1032            /* update our atom */
1033            if( peer ) {
1034                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1035                if( e->wasPieceData )
1036                    a->piece_data_time = now;
1037            }
1038
1039            break;
1040        }
1041
1042        case TR_PEER_CLIENT_GOT_SUGGEST:
1043            if( peer )
1044                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1045            break;
1046
1047        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1048            if( peer )
1049                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1050            break;
1051
1052        case TR_PEER_CLIENT_GOT_DATA:
1053        {
1054            const time_t now = time( NULL );
1055            tr_torrent * tor = t->tor;
1056            tor->activityDate = now;
1057
1058            /* only add this to downloadedCur if we got it from a peer --
1059             * webseeds shouldn't count against our ratio.  As one tracker
1060             * admin put it, "Those pieces are downloaded directly from the
1061             * content distributor, not the peers, it is the tracker's job
1062             * to manage the swarms, not the web server and does not fit
1063             * into the jurisdiction of the tracker." */
1064            if( peer && e->wasPieceData )
1065                tor->downloadedCur += e->length;
1066
1067            /* update the stats */ 
1068            if( e->wasPieceData )
1069                tr_statsAddDownloaded( tor->session, e->length );
1070
1071            /* update our atom */
1072            if( peer ) {
1073                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1074                if( e->wasPieceData )
1075                    a->piece_data_time = now;
1076            }
1077
1078            break;
1079        }
1080
1081        case TR_PEER_PEER_PROGRESS:
1082        {
1083            if( peer )
1084            {
1085                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1086                const int peerIsSeed = e->progress >= 1.0;
1087                if( peerIsSeed ) {
1088                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1089                    atom->flags |= ADDED_F_SEED_FLAG;
1090                } else {
1091                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1092                    atom->flags &= ~ADDED_F_SEED_FLAG;
1093                }
1094            }
1095            break;
1096        }
1097
1098        case TR_PEER_CLIENT_GOT_BLOCK:
1099        {
1100            tr_torrent *     tor = t->tor;
1101
1102            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1103
1104            tr_cpBlockAdd( tor->completion, block );
1105            decrementPieceRequests( t, e->pieceIndex );
1106
1107            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1108
1109            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1110            {
1111                const tr_piece_index_t p = e->pieceIndex;
1112                const tr_bool ok = tr_ioTestPiece( tor, p );
1113
1114                if( !ok )
1115                {
1116                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1117                               (unsigned long)p );
1118                }
1119
1120                tr_torrentSetHasPiece( tor, p, ok );
1121                tr_torrentSetPieceChecked( tor, p, TRUE );
1122                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1123
1124                if( !ok )
1125                    gotBadPiece( t, p );
1126                else
1127                {
1128                    int i, peerCount;
1129                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1130                    for( i = 0; i < peerCount; ++i )
1131                        tr_peerMsgsHave( peers[i]->msgs, p );
1132                    tr_free( peers );
1133                }
1134
1135                tr_torrentRecheckCompleteness( tor );
1136            }
1137            break;
1138        }
1139
1140        case TR_PEER_ERROR:
1141            if( e->err == EINVAL )
1142            {
1143                addStrike( t, peer );
1144                peer->doPurge = 1;
1145                tordbg( t, "setting doPurge because we got an EINVAL error" );
1146            }
1147            else if( ( e->err == ERANGE )
1148                  || ( e->err == EMSGSIZE )
1149                  || ( e->err == ENOTCONN ) )
1150            {
1151                /* some protocol error from the peer */
1152                peer->doPurge = 1;
1153                tordbg( t, "setting doPurge because we got an ERANGE, EMSGSIZE, or ENOTCONN error" );
1154            }
1155            else /* a local error, such as an IO error */
1156            {
1157                t->tor->error = e->err;
1158                tr_strlcpy( t->tor->errorString,
1159                            tr_strerror( t->tor->error ),
1160                            sizeof( t->tor->errorString ) );
1161                tr_torrentStop( t->tor );
1162            }
1163            break;
1164
1165        default:
1166            assert( 0 );
1167    }
1168
1169    torrentUnlock( t );
1170}
1171
1172static void
1173ensureAtomExists( Torrent          * t,
1174                  const tr_address * addr,
1175                  tr_port            port,
1176                  uint8_t            flags,
1177                  uint8_t            from )
1178{
1179    if( getExistingAtom( t, addr ) == NULL )
1180    {
1181        struct peer_atom * a;
1182        a = tr_new0( struct peer_atom, 1 );
1183        a->addr = *addr;
1184        a->port = port;
1185        a->flags = flags;
1186        a->from = from;
1187        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1188        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1189    }
1190}
1191
1192static int
1193getMaxPeerCount( const tr_torrent * tor )
1194{
1195    return tor->maxConnectedPeers;
1196}
1197
1198static int
1199getPeerCount( const Torrent * t )
1200{
1201    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1202}
1203
1204/* FIXME: this is kind of a mess. */
1205static tr_bool
1206myHandshakeDoneCB( tr_handshake  *  handshake,
1207                   tr_peerIo     * io,
1208                   int             isConnected,
1209                   const uint8_t * peer_id,
1210                   void          * vmanager )
1211{
1212    tr_bool            ok = isConnected;
1213    tr_bool            success = FALSE;
1214    tr_port            port;
1215    const tr_address * addr;
1216    tr_peerMgr       * manager = vmanager;
1217    Torrent          * t;
1218    tr_handshake     * ours;
1219
1220    assert( io );
1221    assert( isConnected == 0 || isConnected == 1 );
1222
1223    t = tr_peerIoHasTorrentHash( io )
1224        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1225        : NULL;
1226
1227    if( tr_peerIoIsIncoming ( io ) )
1228        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1229                                        handshake, handshakeCompare );
1230    else if( t )
1231        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1232                                        handshake, handshakeCompare );
1233    else
1234        ours = handshake;
1235
1236    assert( ours );
1237    assert( ours == handshake );
1238
1239    if( t )
1240        torrentLock( t );
1241
1242    addr = tr_peerIoGetAddress( io, &port );
1243
1244    if( !ok || !t || !t->isRunning )
1245    {
1246        if( t )
1247        {
1248            struct peer_atom * atom = getExistingAtom( t, addr );
1249            if( atom )
1250                ++atom->numFails;
1251        }
1252    }
1253    else /* looking good */
1254    {
1255        struct peer_atom * atom;
1256        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1257        atom = getExistingAtom( t, addr );
1258        atom->time = time( NULL );
1259        atom->piece_data_time = 0;
1260
1261        if( atom->myflags & MYFLAG_BANNED )
1262        {
1263            tordbg( t, "banned peer %s tried to reconnect",
1264                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1265        }
1266        else if( tr_peerIoIsIncoming( io )
1267               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1268
1269        {
1270        }
1271        else
1272        {
1273            tr_peer * peer = getExistingPeer( t, addr );
1274
1275            if( peer ) /* we already have this peer */
1276            {
1277            }
1278            else
1279            {
1280                peer = getPeer( t, addr );
1281                tr_free( peer->client );
1282
1283                if( !peer_id )
1284                    peer->client = NULL;
1285                else {
1286                    char client[128];
1287                    tr_clientForId( client, sizeof( client ), peer_id );
1288                    peer->client = tr_strdup( client );
1289                }
1290
1291                peer->port = port;
1292                peer->io = tr_handshakeStealIO( handshake );
1293                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1294                tr_peerIoSetBandwidth( io, peer->bandwidth );
1295
1296                success = TRUE;
1297            }
1298        }
1299    }
1300
1301    if( !success )
1302        tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
1303
1304    if( t )
1305        torrentUnlock( t );
1306
1307    return success;
1308}
1309
1310void
1311tr_peerMgrAddIncoming( tr_peerMgr * manager,
1312                       tr_address * addr,
1313                       tr_port      port,
1314                       int          socket )
1315{
1316    managerLock( manager );
1317
1318    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1319    {
1320        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1321        tr_netClose( socket );
1322    }
1323    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1324    {
1325        tr_netClose( socket );
1326    }
1327    else /* we don't have a connetion to them yet... */
1328    {
1329        tr_peerIo *    io;
1330        tr_handshake * handshake;
1331
1332        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1333
1334        handshake = tr_handshakeNew( io,
1335                                     manager->session->encryptionMode,
1336                                     myHandshakeDoneCB,
1337                                     manager );
1338
1339        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1340                                 handshakeCompare );
1341    }
1342
1343    managerUnlock( manager );
1344}
1345
1346static int
1347tr_isPex( const tr_pex * pex )
1348{
1349    return pex && tr_isAddress( &pex->addr );
1350}
1351
1352void
1353tr_peerMgrAddPex( tr_peerMgr *    manager,
1354                  const uint8_t * torrentHash,
1355                  uint8_t         from,
1356                  const tr_pex *  pex )
1357{
1358    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1359    {
1360        Torrent * t;
1361        managerLock( manager );
1362
1363
1364        t = getExistingTorrent( manager, torrentHash );
1365        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1366            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1367   
1368        managerUnlock( manager );
1369    }
1370}
1371
1372tr_pex *
1373tr_peerMgrCompactToPex( const void *    compact,
1374                        size_t          compactLen,
1375                        const uint8_t * added_f,
1376                        size_t          added_f_len,
1377                        size_t *        pexCount )
1378{
1379    size_t          i;
1380    size_t          n = compactLen / 6;
1381    const uint8_t * walk = compact;
1382    tr_pex *        pex = tr_new0( tr_pex, n );
1383
1384    for( i = 0; i < n; ++i )
1385    {
1386        pex[i].addr.type = TR_AF_INET;
1387        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1388        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1389        if( added_f && ( n == added_f_len ) )
1390            pex[i].flags = added_f[i];
1391    }
1392
1393    *pexCount = n;
1394    return pex;
1395}
1396
1397tr_pex *
1398tr_peerMgrCompact6ToPex( const void    * compact,
1399                         size_t          compactLen,
1400                         const uint8_t * added_f,
1401                         size_t          added_f_len,
1402                         size_t        * pexCount )
1403{
1404    size_t          i;
1405    size_t          n = compactLen / 18;
1406    const uint8_t * walk = compact;
1407    tr_pex *        pex = tr_new0( tr_pex, n );
1408   
1409    for( i = 0; i < n; ++i )
1410    {
1411        pex[i].addr.type = TR_AF_INET6;
1412        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1413        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1414        if( added_f && ( n == added_f_len ) )
1415            pex[i].flags = added_f[i];
1416    }
1417   
1418    *pexCount = n;
1419    return pex;
1420}
1421
1422tr_pex *
1423tr_peerMgrArrayToPex( const void * array,
1424                      size_t       arrayLen,
1425                      size_t      * pexCount )
1426{
1427    size_t          i;
1428    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1429    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1430    const uint8_t * walk = array;
1431    tr_pex        * pex = tr_new0( tr_pex, n );
1432   
1433    for( i = 0 ; i < n ; i++ ) {
1434        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1435        tr_suspectAddress( &pex[i].addr, "tracker"  );
1436        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1437        pex[i].flags = 0x00;
1438        walk += sizeof( tr_address ) + 2;
1439    }
1440   
1441    *pexCount = n;
1442    return pex;
1443}
1444
1445/**
1446***
1447**/
1448
1449void
1450tr_peerMgrSetBlame( tr_peerMgr *     manager,
1451                    const uint8_t *  torrentHash,
1452                    tr_piece_index_t pieceIndex,
1453                    int              success )
1454{
1455    if( !success )
1456    {
1457        int        peerCount, i;
1458        Torrent *  t = getExistingTorrent( manager, torrentHash );
1459        tr_peer ** peers;
1460
1461        assert( torrentIsLocked( t ) );
1462
1463        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1464        for( i = 0; i < peerCount; ++i )
1465        {
1466            tr_peer * peer = peers[i];
1467            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1468            {
1469                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1470                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1471                        pieceIndex, (int)peer->strikes + 1 );
1472                addStrike( t, peer );
1473            }
1474        }
1475    }
1476}
1477
1478int
1479tr_pexCompare( const void * va, const void * vb )
1480{
1481    const tr_pex * a = va;
1482    const tr_pex * b = vb;
1483    int i;
1484
1485    assert( tr_isPex( a ) );
1486    assert( tr_isPex( b ) );
1487
1488    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1489        return i;
1490
1491    if( a->port != b->port )
1492        return a->port < b->port ? -1 : 1;
1493
1494    return 0;
1495}
1496
1497static int
1498peerPrefersCrypto( const tr_peer * peer )
1499{
1500    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1501        return TRUE;
1502
1503    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1504        return FALSE;
1505
1506    return tr_peerIoIsEncrypted( peer->io );
1507}
1508
1509int
1510tr_peerMgrGetPeers( tr_peerMgr      * manager,
1511                    const uint8_t   * torrentHash,
1512                    tr_pex         ** setme_pex,
1513                    uint8_t           af)
1514{
1515    int peerCount = 0;
1516    int peersReturning = 0;
1517    const Torrent *  t;
1518
1519    managerLock( manager );
1520
1521    t = getExistingTorrent( manager, torrentHash );
1522    if( t == NULL )
1523    {
1524        *setme_pex = NULL;
1525    }
1526    else
1527    {
1528        int i;
1529        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1530        /* for now, this will waste memory on torrents that have both
1531         * ipv6 and ipv4 peers */
1532        tr_pex * pex = tr_new( tr_pex, peerCount );
1533        tr_pex * walk = pex;
1534
1535        for( i=0; i<peerCount; ++i )
1536        {
1537            const tr_peer * peer = peers[i];
1538            if( peer->addr.type == af )
1539            {
1540                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1541                assert( tr_isAddress( &peer->addr ) );
1542                walk->addr = peer->addr;
1543                walk->port = peer->port;
1544                walk->flags = 0;
1545                if( peerPrefersCrypto( peer ) )
1546                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1547                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) ||
1548                    ( peer->progress >= 1.0 ) )
1549                    walk->flags |= ADDED_F_SEED_FLAG;
1550                ++peersReturning;
1551                ++walk;
1552            }
1553        }
1554
1555        assert( ( walk - pex ) == peersReturning );
1556        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1557
1558        *setme_pex = pex;
1559    }
1560
1561    managerUnlock( manager );
1562    return peersReturning;
1563}
1564
1565static int reconnectPulse( void * vtorrent );
1566
1567static int rechokePulse( void * vtorrent );
1568
1569void
1570tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1571                        const uint8_t * torrentHash )
1572{
1573    Torrent * t;
1574
1575    managerLock( manager );
1576
1577    t = getExistingTorrent( manager, torrentHash );
1578
1579    assert( t );
1580    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1581    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1582
1583    if( !t->isRunning )
1584    {
1585        t->isRunning = 1;
1586
1587        t->reconnectTimer = tr_timerNew( t->manager->session,
1588                                         reconnectPulse, t,
1589                                         RECONNECT_PERIOD_MSEC );
1590
1591        t->rechokeTimer = tr_timerNew( t->manager->session,
1592                                       rechokePulse, t,
1593                                       RECHOKE_PERIOD_MSEC );
1594
1595        reconnectPulse( t );
1596
1597        rechokePulse( t );
1598
1599        if( !tr_ptrArrayEmpty( t->webseeds ) )
1600            refillSoon( t );
1601    }
1602
1603    managerUnlock( manager );
1604}
1605
1606static void
1607stopTorrent( Torrent * t )
1608{
1609    assert( torrentIsLocked( t ) );
1610
1611    t->isRunning = 0;
1612    tr_timerFree( &t->rechokeTimer );
1613    tr_timerFree( &t->reconnectTimer );
1614
1615    /* disconnect the peers. */
1616    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1617    tr_ptrArrayClear( t->peers );
1618
1619    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1620     * which removes the handshake from t->outgoingHandshakes... */
1621    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1622        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1623}
1624
1625void
1626tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1627                       const uint8_t * torrentHash )
1628{
1629    managerLock( manager );
1630
1631    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1632
1633    managerUnlock( manager );
1634}
1635
1636void
1637tr_peerMgrAddTorrent( tr_peerMgr * manager,
1638                      tr_torrent * tor )
1639{
1640    Torrent * t;
1641
1642    managerLock( manager );
1643
1644    assert( tor );
1645    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1646
1647    t = torrentConstructor( manager, tor );
1648    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1649
1650    managerUnlock( manager );
1651}
1652
1653void
1654tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1655                         const uint8_t * torrentHash )
1656{
1657    Torrent * t;
1658
1659    managerLock( manager );
1660
1661    t = getExistingTorrent( manager, torrentHash );
1662    assert( t );
1663    stopTorrent( t );
1664    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1665    torrentDestructor( t );
1666
1667    managerUnlock( manager );
1668}
1669
1670void
1671tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1672                               const uint8_t *    torrentHash,
1673                               int8_t *           tab,
1674                               unsigned int       tabCount )
1675{
1676    tr_piece_index_t   i;
1677    const Torrent *    t;
1678    const tr_torrent * tor;
1679    float              interval;
1680    int                isSeed;
1681    int                peerCount;
1682    const tr_peer **   peers;
1683
1684    managerLock( manager );
1685
1686    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1687    tor = t->tor;
1688    interval = tor->info.pieceCount / (float)tabCount;
1689    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1690    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1691
1692    memset( tab, 0, tabCount );
1693
1694    for( i = 0; tor && i < tabCount; ++i )
1695    {
1696        const int piece = i * interval;
1697
1698        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1699            tab[i] = -1;
1700        else if( peerCount )
1701        {
1702            int j;
1703            for( j = 0; j < peerCount; ++j )
1704                if( tr_bitfieldHas( peers[j]->have, i ) )
1705                    ++tab[i];
1706        }
1707    }
1708
1709    managerUnlock( manager );
1710}
1711
1712/* Returns the pieces that are available from peers */
1713tr_bitfield*
1714tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1715                        const uint8_t *    torrentHash )
1716{
1717    int           i, size;
1718    Torrent *     t;
1719    tr_peer **    peers;
1720    tr_bitfield * pieces;
1721    managerLock( manager );
1722
1723    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1724    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1725    peers = getConnectedPeers( t, &size );
1726    for( i = 0; i < size; ++i )
1727        tr_bitfieldOr( pieces, peers[i]->have );
1728
1729    managerUnlock( manager );
1730    tr_free( peers );
1731    return pieces;
1732}
1733
1734int
1735tr_peerMgrHasConnections( const tr_peerMgr * manager,
1736                          const uint8_t *    torrentHash )
1737{
1738    int ret;
1739    const Torrent * t;
1740    managerLock( manager );
1741
1742    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1743    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1744
1745    managerUnlock( manager );
1746    return ret;
1747}
1748
1749void
1750tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1751                        const uint8_t    * torrentHash,
1752                        int              * setmePeersKnown,
1753                        int              * setmePeersConnected,
1754                        int              * setmeSeedsConnected,
1755                        int              * setmeWebseedsSendingToUs,
1756                        int              * setmePeersSendingToUs,
1757                        int              * setmePeersGettingFromUs,
1758                        int              * setmePeersFrom )
1759{
1760    int i, size;
1761    const Torrent * t;
1762    const tr_peer ** peers;
1763    const tr_webseed ** webseeds;
1764
1765    managerLock( manager );
1766
1767    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1768    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1769
1770    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1771    *setmePeersConnected       = 0;
1772    *setmeSeedsConnected       = 0;
1773    *setmePeersGettingFromUs   = 0;
1774    *setmePeersSendingToUs     = 0;
1775    *setmeWebseedsSendingToUs  = 0;
1776
1777    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1778        setmePeersFrom[i] = 0;
1779
1780    for( i=0; i<size; ++i )
1781    {
1782        const tr_peer * peer = peers[i];
1783        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1784
1785        if( peer->io == NULL ) /* not connected */
1786            continue;
1787
1788        ++*setmePeersConnected;
1789
1790        ++setmePeersFrom[atom->from];
1791
1792        if( clientIsDownloadingFrom( peer ) )
1793            ++*setmePeersSendingToUs;
1794
1795        if( clientIsUploadingTo( peer ) )
1796            ++*setmePeersGettingFromUs;
1797
1798        if( atom->flags & ADDED_F_SEED_FLAG )
1799            ++*setmeSeedsConnected;
1800    }
1801
1802    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1803    for( i=0; i<size; ++i )
1804    {
1805        if( tr_webseedIsActive( webseeds[i] ) )
1806            ++*setmeWebseedsSendingToUs;
1807    }
1808
1809    managerUnlock( manager );
1810}
1811
1812float*
1813tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1814                     const uint8_t *    torrentHash )
1815{
1816    const Torrent * t;
1817    const tr_webseed ** webseeds;
1818    int i;
1819    int webseedCount;
1820    float * ret;
1821
1822    assert( manager );
1823    managerLock( manager );
1824
1825    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1826    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1827                                                     &webseedCount );
1828    assert( webseedCount == t->tor->info.webseedCount );
1829    ret = tr_new0( float, webseedCount );
1830
1831    for( i=0; i<webseedCount; ++i )
1832        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1833            ret[i] = -1.0;
1834
1835    managerUnlock( manager );
1836    return ret;
1837}
1838
1839double
1840tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1841{
1842    assert( peer );
1843    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1844
1845    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1846}
1847
1848
1849struct tr_peer_stat *
1850tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1851                     const   uint8_t     * torrentHash,
1852                     int                 * setmeCount UNUSED )
1853{
1854    int             i, size;
1855    const Torrent * t;
1856    tr_peer **      peers;
1857    tr_peer_stat *  ret;
1858
1859    assert( manager );
1860    managerLock( manager );
1861
1862    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1863    peers = getConnectedPeers( (Torrent*)t, &size );
1864    ret = tr_new0( tr_peer_stat, size );
1865
1866    for( i = 0; i < size; ++i )
1867    {
1868        char *                   pch;
1869        const tr_peer *          peer = peers[i];
1870        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1871        tr_peer_stat *           stat = ret + i;
1872        tr_address               norm_addr;
1873
1874        memcpy( &norm_addr, &peer->addr, sizeof( tr_address ) );
1875        tr_normalizeV4Mapped( &norm_addr );
1876        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1877        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1878                   sizeof( stat->client ) );
1879        stat->port               = ntohs( peer->port );
1880        stat->from               = atom->from;
1881        stat->progress           = peer->progress;
1882        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1883        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1884        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1885        stat->peerIsChoked       = peer->peerIsChoked;
1886        stat->peerIsInterested   = peer->peerIsInterested;
1887        stat->clientIsChoked     = peer->clientIsChoked;
1888        stat->clientIsInterested = peer->clientIsInterested;
1889        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1890        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1891        stat->isUploadingTo      = clientIsUploadingTo( peer );
1892        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1893
1894        pch = stat->flagStr;
1895        if( t->optimistic == peer ) *pch++ = 'O';
1896        if( stat->isDownloadingFrom ) *pch++ = 'D';
1897        else if( stat->clientIsInterested ) *pch++ = 'd';
1898        if( stat->isUploadingTo ) *pch++ = 'U';
1899        else if( stat->peerIsInterested ) *pch++ = 'u';
1900        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1901        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1902        if( stat->isEncrypted ) *pch++ = 'E';
1903        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1904        if( stat->isIncoming ) *pch++ = 'I';
1905        *pch = '\0';
1906    }
1907
1908    *setmeCount = size;
1909    tr_free( peers );
1910
1911    managerUnlock( manager );
1912    return ret;
1913}
1914
1915/**
1916***
1917**/
1918
1919struct ChokeData
1920{
1921    tr_bool         doUnchoke;
1922    tr_bool         isInterested;
1923    tr_bool         isChoked;
1924    int             rate;
1925    tr_peer *       peer;
1926};
1927
1928static int
1929compareChoke( const void * va,
1930              const void * vb )
1931{
1932    const struct ChokeData * a = va;
1933    const struct ChokeData * b = vb;
1934
1935    if( a->rate != b->rate ) /* prefer higher overall speeds */
1936        return a->rate > b->rate ? -1 : 1;
1937
1938    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1939        return a->isChoked ? 1 : -1;
1940
1941    return 0;
1942}
1943
1944static int
1945isNew( const tr_peer * peer )
1946{
1947    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1948}
1949
1950static int
1951isSame( const tr_peer * peer )
1952{
1953    return peer && peer->client && strstr( peer->client, "Transmission" );
1954}
1955
1956/**
1957***
1958**/
1959
1960static void
1961rechoke( Torrent * t )
1962{
1963    int                i, peerCount, size, unchokedInterested;
1964    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1965    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1966    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1967
1968    assert( torrentIsLocked( t ) );
1969
1970    /* sort the peers by preference and rate */
1971    for( i = 0, size = 0; i < peerCount; ++i )
1972    {
1973        tr_peer * peer = peers[i];
1974        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1975
1976        if( peer->progress >= 1.0 ) /* choke all seeds */
1977        {
1978            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1979        }
1980        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1981        {
1982            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1983        }
1984        else if( chokeAll ) /* choke everyone if we're not uploading */
1985        {
1986            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1987        }
1988        else
1989        {
1990            struct ChokeData * n = &choke[size++];
1991            n->peer         = peer;
1992            n->isInterested = peer->peerIsInterested;
1993            n->isChoked     = peer->peerIsChoked;
1994            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1995        }
1996    }
1997
1998    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1999
2000    /**
2001     * Reciprocation and number of uploads capping is managed by unchoking
2002     * the N peers which have the best upload rate and are interested.
2003     * This maximizes the client's download rate. These N peers are
2004     * referred to as downloaders, because they are interested in downloading
2005     * from the client.
2006     *
2007     * Peers which have a better upload rate (as compared to the downloaders)
2008     * but aren't interested get unchoked. If they become interested, the
2009     * downloader with the worst upload rate gets choked. If a client has
2010     * a complete file, it uses its upload rate rather than its download
2011     * rate to decide which peers to unchoke.
2012     */
2013    unchokedInterested = 0;
2014    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2015        choke[i].doUnchoke = 1;
2016        if( choke[i].isInterested )
2017            ++unchokedInterested;
2018    }
2019
2020    /* optimistic unchoke */
2021    if( i < size )
2022    {
2023        int n;
2024        struct ChokeData * c;
2025        tr_ptrArray * randPool = tr_ptrArrayNew( );
2026
2027        for( ; i<size; ++i )
2028        {
2029            if( choke[i].isInterested )
2030            {
2031                const tr_peer * peer = choke[i].peer;
2032                int x = 1, y;
2033                if( isNew( peer ) ) x *= 3;
2034                if( isSame( peer ) ) x *= 3;
2035                for( y=0; y<x; ++y )
2036                    tr_ptrArrayAppend( randPool, &choke[i] );
2037            }
2038        }
2039
2040        if(( n = tr_ptrArraySize( randPool )))
2041        {
2042            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
2043            c->doUnchoke = 1;
2044            t->optimistic = c->peer;
2045        }
2046
2047        tr_ptrArrayFree( randPool, NULL );
2048    }
2049
2050    for( i = 0; i < size; ++i )
2051        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2052
2053    /* cleanup */
2054    tr_free( choke );
2055    tr_free( peers );
2056}
2057
2058static int
2059rechokePulse( void * vtorrent )
2060{
2061    Torrent * t = vtorrent;
2062
2063    torrentLock( t );
2064    rechoke( t );
2065    torrentUnlock( t );
2066    return TRUE;
2067}
2068
2069/***
2070****
2071****  Life and Death
2072****
2073***/
2074
2075static int
2076shouldPeerBeClosed( const Torrent * t,
2077                    const tr_peer * peer,
2078                    int             peerCount )
2079{
2080    const tr_torrent *       tor = t->tor;
2081    const time_t             now = time( NULL );
2082    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2083
2084    /* if it's marked for purging, close it */
2085    if( peer->doPurge )
2086    {
2087        tordbg( t, "purging peer %s because its doPurge flag is set",
2088                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2089        return TRUE;
2090    }
2091
2092    /* if we're seeding and the peer has everything we have,
2093     * and enough time has passed for a pex exchange, then disconnect */
2094    if( tr_torrentIsSeed( tor ) )
2095    {
2096        int peerHasEverything;
2097        if( atom->flags & ADDED_F_SEED_FLAG )
2098            peerHasEverything = TRUE;
2099        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2100            peerHasEverything = FALSE;
2101        else {
2102            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2103            tr_bitfieldDifference( tmp, peer->have );
2104            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2105            tr_bitfieldFree( tmp );
2106        }
2107        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2108            tordbg( t, "purging peer %s because we're both seeds",
2109                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2110            return TRUE;
2111        }
2112    }
2113
2114    /* disconnect if it's been too long since piece data has been transferred.
2115     * this is on a sliding scale based on number of available peers... */
2116    {
2117        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2118        /* if we have >= relaxIfFewerThan, strictness is 100%.
2119         * if we have zero connections, strictness is 0% */
2120        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2121            ? 1.0
2122            : peerCount / (float)relaxStrictnessIfFewerThanN;
2123        const int lo = MIN_UPLOAD_IDLE_SECS;
2124        const int hi = MAX_UPLOAD_IDLE_SECS;
2125        const int limit = hi - ( ( hi - lo ) * strictness );
2126        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2127/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2128        if( idleTime > limit ) {
2129            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2130                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2131            return TRUE;
2132        }
2133    }
2134
2135    return FALSE;
2136}
2137
2138static tr_peer **
2139getPeersToClose( Torrent * t, int * setmeSize )
2140{
2141    int i, peerCount, outsize;
2142    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2143    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2144
2145    assert( torrentIsLocked( t ) );
2146
2147    for( i = outsize = 0; i < peerCount; ++i )
2148        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2149            ret[outsize++] = peers[i];
2150
2151    *setmeSize = outsize;
2152    return ret;
2153}
2154
2155static int
2156compareCandidates( const void * va,
2157                   const void * vb )
2158{
2159    const struct peer_atom * a = *(const struct peer_atom**) va;
2160    const struct peer_atom * b = *(const struct peer_atom**) vb;
2161
2162    /* <Charles> Here we would probably want to try reconnecting to
2163     * peers that had most recently given us data. Lots of users have
2164     * trouble with resets due to their routers and/or ISPs. This way we
2165     * can quickly recover from an unwanted reset. So we sort
2166     * piece_data_time in descending order.
2167     */
2168
2169    if( a->piece_data_time != b->piece_data_time )
2170        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2171
2172    if( a->numFails != b->numFails )
2173        return a->numFails < b->numFails ? -1 : 1;
2174
2175    if( a->time != b->time )
2176        return a->time < b->time ? -1 : 1;
2177
2178    /* all other things being equal, prefer peers whose
2179     * information comes from a more reliable source */
2180    if( a->from != b->from )
2181        return a->from < b->from ? -1 : 1;
2182
2183    return 0;
2184}
2185
2186static int
2187getReconnectIntervalSecs( const struct peer_atom * atom )
2188{
2189    int          sec;
2190    const time_t now = time( NULL );
2191
2192    /* if we were recently connected to this peer and transferring piece
2193     * data, try to reconnect to them sooner rather that later -- we don't
2194     * want network troubles to get in the way of a good peer. */
2195    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2196        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2197
2198    /* don't allow reconnects more often than our minimum */
2199    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2200        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2201
2202    /* otherwise, the interval depends on how many times we've tried
2203     * and failed to connect to the peer */
2204    else switch( atom->numFails ) {
2205        case 0: sec = 0; break;
2206        case 1: sec = 5; break;
2207        case 2: sec = 2 * 60; break;
2208        case 3: sec = 15 * 60; break;
2209        case 4: sec = 30 * 60; break;
2210        case 5: sec = 60 * 60; break;
2211        default: sec = 120 * 60; break;
2212    }
2213
2214    return sec;
2215}
2216
2217static struct peer_atom **
2218getPeerCandidates( Torrent * t, int * setmeSize )
2219{
2220    int                 i, atomCount, retCount;
2221    struct peer_atom ** atoms;
2222    struct peer_atom ** ret;
2223    const time_t        now = time( NULL );
2224    const int           seed = tr_torrentIsSeed( t->tor );
2225
2226    assert( torrentIsLocked( t ) );
2227
2228    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2229    ret = tr_new( struct peer_atom*, atomCount );
2230    for( i = retCount = 0; i < atomCount; ++i )
2231    {
2232        int                interval;
2233        struct peer_atom * atom = atoms[i];
2234
2235        /* peer fed us too much bad data ... we only keep it around
2236         * now to weed it out in case someone sends it to us via pex */
2237        if( atom->myflags & MYFLAG_BANNED )
2238            continue;
2239
2240        /* peer was unconnectable before, so we're not going to keep trying.
2241         * this is needs a separate flag from `banned', since if they try
2242         * to connect to us later, we'll let them in */
2243        if( atom->myflags & MYFLAG_UNREACHABLE )
2244            continue;
2245
2246        /* we don't need two connections to the same peer... */
2247        if( peerIsInUse( t, &atom->addr ) )
2248            continue;
2249
2250        /* no need to connect if we're both seeds... */
2251        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2252                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2253            continue;
2254
2255        /* don't reconnect too often */
2256        interval = getReconnectIntervalSecs( atom );
2257        if( ( now - atom->time ) < interval )
2258        {
2259            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2260                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2261            continue;
2262        }
2263
2264        /* Don't connect to peers in our blocklist */
2265        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2266            continue;
2267
2268        ret[retCount++] = atom;
2269    }
2270
2271    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2272    *setmeSize = retCount;
2273    return ret;
2274}
2275
2276static int
2277reconnectPulse( void * vtorrent )
2278{
2279    Torrent *     t = vtorrent;
2280    static time_t prevTime = 0;
2281    static int    newConnectionsThisSecond = 0;
2282    time_t        now;
2283
2284    torrentLock( t );
2285
2286    now = time( NULL );
2287    if( prevTime != now )
2288    {
2289        prevTime = now;
2290        newConnectionsThisSecond = 0;
2291    }
2292
2293    if( !t->isRunning )
2294    {
2295        removeAllPeers( t );
2296    }
2297    else
2298    {
2299        int i, nCandidates, nBad;
2300        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2301        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2302
2303        if( nBad || nCandidates )
2304            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2305                   "%d connection candidates, %d atoms, max per pulse is %d",
2306                   t->tor->info.name, nBad, nCandidates,
2307                   tr_ptrArraySize( t->pool ),
2308                   (int)MAX_RECONNECTIONS_PER_PULSE );
2309
2310        /* disconnect some peers.
2311           if we transferred piece data, then they might be good peers,
2312           so reset their `numFails' weight to zero.  otherwise we connected
2313           to them fruitlessly, so mark it as another fail */
2314        for( i = 0; i < nBad; ++i ) {
2315            tr_peer * peer = connections[i];
2316            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2317            if( atom->piece_data_time )
2318                atom->numFails = 0;
2319            else
2320                ++atom->numFails;
2321             
2322            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2323            removePeer( t, peer );
2324        }
2325
2326        /* add some new ones */
2327        for( i = 0;    ( i < nCandidates )
2328           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2329           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2330           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2331        {
2332            tr_peerMgr *       mgr = t->manager;
2333            struct peer_atom * atom = candidates[i];
2334            tr_peerIo *        io;
2335
2336            tordbg( t, "Starting an OUTGOING connection with %s",
2337                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2338
2339            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2340            if( io == NULL )
2341            {
2342                atom->myflags |= MYFLAG_UNREACHABLE;
2343            }
2344            else
2345            {
2346                tr_handshake * handshake = tr_handshakeNew( io,
2347                                                            mgr->session->encryptionMode,
2348                                                            myHandshakeDoneCB,
2349                                                            mgr );
2350
2351                assert( tr_peerIoGetTorrentHash( io ) );
2352
2353                ++newConnectionsThisSecond;
2354
2355                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2356                                         handshakeCompare );
2357            }
2358
2359            atom->time = time( NULL );
2360        }
2361
2362        /* cleanup */
2363        tr_free( connections );
2364        tr_free( candidates );
2365    }
2366
2367    torrentUnlock( t );
2368    return TRUE;
2369}
2370
2371/****
2372*****
2373*****  BANDWIDTH ALLOCATION
2374*****
2375****/
2376
2377static void
2378pumpAllPeers( tr_peerMgr * mgr )
2379{
2380    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2381    int       i, j;
2382
2383    for( i=0; i<torrentCount; ++i )
2384    {
2385        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2386        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2387        {
2388            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2389            tr_peerMsgsPulse( peer->msgs );
2390        }
2391    }
2392}
2393
2394static int
2395bandwidthPulse( void * vmgr )
2396{
2397    tr_handshake * handshake;
2398    tr_peerMgr * mgr = vmgr;
2399    managerLock( mgr );
2400
2401    /* FIXME: this next line probably isn't necessary... */
2402    pumpAllPeers( mgr );
2403
2404    /* allocate bandwidth to the peers */
2405    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2406    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2407
2408    /* free all the finished handshakes */
2409    while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
2410        tr_handshakeFree( handshake );
2411
2412    managerUnlock( mgr );
2413    return TRUE;
2414}
Note: See TracBrowser for help on using the repository browser.