source: trunk/libtransmission/peer-mgr.c @ 7524

Last change on this file since 7524 was 7524, checked in by charles, 12 years ago

(trunk libT) avoid some unnecessary memory fragmentation... for composited objects that have a tr_ptrArray, contain the tr_ptrArray directly rather than a pointer to one allocated elsewhere on the heap.

  • Property svn:keywords set to Date Rev Author Id
File size: 68.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7524 2008-12-29 08:54:36Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 4,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 8,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_ptrArray       finishedHandshakes; /* tr_handshake */
148    tr_timer        * bandwidthTimer;
149};
150
151#define tordbg( t, ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
155    } while( 0 )
156
157#define dbgmsg( ... ) \
158    do { \
159        if( tr_deepLoggingIsActive( ) ) \
160            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
161    } while( 0 )
162
163/**
164***
165**/
166
167static void
168managerLock( const struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->session );
171}
172
173static void
174managerUnlock( const struct tr_peerMgr * manager )
175{
176    tr_globalUnlock( manager->session );
177}
178
179static void
180torrentLock( Torrent * torrent )
181{
182    managerLock( torrent->manager );
183}
184
185static void
186torrentUnlock( Torrent * torrent )
187{
188    managerUnlock( torrent->manager );
189}
190
191static int
192torrentIsLocked( const Torrent * t )
193{
194    return tr_globalIsLocked( t->manager->session );
195}
196
197/**
198***
199**/
200
201static int
202handshakeCompareToAddr( const void * va, const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a, const void * b )
211{
212    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
213}
214
215static tr_handshake*
216getExistingHandshake( tr_ptrArray      * handshakes,
217                      const tr_address * addr )
218{
219    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va, const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return tr_compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va, const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va, const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return tr_compareAddresses( &a->addr, &b->addr );
278}
279
280static int
281peerCompareToAddr( const void * va, const void * vb )
282{
283    const tr_peer * a = va;
284
285    return tr_compareAddresses( &a->addr, vb );
286}
287
288static tr_peer*
289getExistingPeer( Torrent          * torrent,
290                 const tr_address * addr )
291{
292    assert( torrentIsLocked( torrent ) );
293    assert( addr );
294
295    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
296}
297
298static struct peer_atom*
299getExistingAtom( const Torrent    * t,
300                 const tr_address * addr )
301{
302    Torrent * tt = (Torrent*)t;
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( &t->outgoingHandshakes, addr )
317        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( tr_torrent * tor, const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
327    return p;
328}
329
330static tr_peer*
331getPeer( Torrent          * torrent,
332         const tr_address * addr )
333{
334    tr_peer * peer;
335
336    assert( torrentIsLocked( torrent ) );
337
338    peer = getExistingPeer( torrent, addr );
339
340    if( peer == NULL )
341    {
342        peer = peerConstructor( torrent->tor, addr );
343        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
344    }
345
346    return peer;
347}
348
349static void
350peerDestructor( tr_peer * peer )
351{
352    assert( peer );
353
354    if( peer->msgs != NULL )
355    {
356        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
357        tr_peerMsgsFree( peer->msgs );
358    }
359
360    tr_peerIoFree( peer->io );
361
362    tr_bitfieldFree( peer->have );
363    tr_bitfieldFree( peer->blame );
364    tr_free( peer->client );
365
366    tr_bandwidthFree( peer->bandwidth );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t,
373            tr_peer * peer )
374{
375    tr_peer *          removed;
376    struct peer_atom * atom;
377
378    assert( torrentIsLocked( t ) );
379
380    atom = getExistingAtom( t, &peer->addr );
381    assert( atom );
382    atom->time = time( NULL );
383
384    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
385    assert( removed == peer );
386    peerDestructor( removed );
387}
388
389static void
390removeAllPeers( Torrent * t )
391{
392    while( !tr_ptrArrayEmpty( &t->peers ) )
393        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
394}
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400    uint8_t   hash[SHA_DIGEST_LENGTH];
401
402    assert( t );
403    assert( !t->isRunning );
404    assert( torrentIsLocked( t ) );
405    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
406    assert( tr_ptrArrayEmpty( &t->peers ) );
407
408    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
409
410    tr_timerFree( &t->reconnectTimer );
411    tr_timerFree( &t->rechokeTimer );
412    tr_timerFree( &t->refillTimer );
413
414    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
417    tr_ptrArrayDestruct( &t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423static void peerCallbackFunc( void * vpeer,
424                              void * vevent,
425                              void * vt );
426
427static Torrent*
428torrentConstructor( tr_peerMgr * manager,
429                    tr_torrent * tor )
430{
431    int       i;
432    Torrent * t;
433
434    t = tr_new0( Torrent, 1 );
435    t->manager = manager;
436    t->tor = tor;
437    t->pool = TR_PTR_ARRAY_INIT;
438    t->peers = TR_PTR_ARRAY_INIT;
439    t->webseeds = TR_PTR_ARRAY_INIT;
440    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
441    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
442
443    for( i = 0; i < tor->info.webseedCount; ++i )
444    {
445        tr_webseed * w =
446            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
447        tr_ptrArrayAppend( &t->webseeds, w );
448    }
449
450    return t;
451}
452
453
454static int bandwidthPulse( void * vmgr );
455
456
457tr_peerMgr*
458tr_peerMgrNew( tr_session * session )
459{
460    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
461
462    m->session = session;
463    m->torrents = TR_PTR_ARRAY_INIT;
464    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
465    m->finishedHandshakes = TR_PTR_ARRAY_INIT;
466    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
467    return m;
468}
469
470void
471tr_peerMgrFree( tr_peerMgr * manager )
472{
473    tr_handshake * handshake;
474
475    managerLock( manager );
476
477    tr_timerFree( &manager->bandwidthTimer );
478
479    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
480     * the item from manager->handshakes, so this is a little roundabout... */
481    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
482        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
483
484    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
485
486    while(( handshake = tr_ptrArrayPop( &manager->finishedHandshakes )))
487        tr_handshakeFree( handshake );
488
489    tr_ptrArrayDestruct( &manager->finishedHandshakes, NULL );
490
491    /* free the torrents. */
492    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
493
494    managerUnlock( manager );
495    tr_free( manager );
496}
497
498static tr_peer**
499getConnectedPeers( Torrent * t, int * setmeCount )
500{
501    int i, peerCount, connectionCount;
502    tr_peer **peers;
503    tr_peer **ret;
504
505    assert( torrentIsLocked( t ) );
506
507    peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
508    ret = tr_new( tr_peer *, peerCount );
509
510    for( i = connectionCount = 0; i < peerCount; ++i )
511        if( peers[i]->msgs )
512            ret[connectionCount++] = peers[i];
513
514    *setmeCount = connectionCount;
515    return ret;
516}
517
518static int
519clientIsDownloadingFrom( const tr_peer * peer )
520{
521    return peer->clientIsInterested && !peer->clientIsChoked;
522}
523
524static int
525clientIsUploadingTo( const tr_peer * peer )
526{
527    return peer->peerIsInterested && !peer->peerIsChoked;
528}
529
530/***
531****
532***/
533
534tr_bool
535tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
536                      const uint8_t     * torrentHash,
537                      const tr_address  * addr )
538{
539    tr_bool isSeed = FALSE;
540    const Torrent * t = NULL;
541    const struct peer_atom * atom = NULL;
542
543    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
544    if( t )
545        atom = getExistingAtom( t, addr );
546    if( atom )
547        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
548
549    return isSeed;
550}
551
552/****
553*****
554*****  REFILL
555*****
556****/
557
558static void
559assertValidPiece( Torrent * t, tr_piece_index_t piece )
560{
561    assert( t );
562    assert( t->tor );
563    assert( piece < t->tor->info.pieceCount );
564}
565
566static int
567getPieceRequests( Torrent * t, tr_piece_index_t piece )
568{
569    assertValidPiece( t, piece );
570
571    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
572}
573
574static void
575incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
576{
577    assertValidPiece( t, piece );
578
579    if( t->pendingRequestCount == NULL )
580        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
581    t->pendingRequestCount[piece]++;
582}
583
584static void
585decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
586{
587    assertValidPiece( t, piece );
588
589    if( t->pendingRequestCount )
590        t->pendingRequestCount[piece]--;
591}
592
593struct tr_refill_piece
594{
595    tr_priority_t    priority;
596    uint32_t         piece;
597    uint32_t         peerCount;
598    int              random;
599    int              pendingRequestCount;
600    int              missingBlockCount;
601};
602
603static int
604compareRefillPiece( const void * aIn, const void * bIn )
605{
606    const struct tr_refill_piece * a = aIn;
607    const struct tr_refill_piece * b = bIn;
608
609    /* if one piece has a higher priority, it goes first */
610    if( a->priority != b->priority )
611        return a->priority > b->priority ? -1 : 1;
612
613    /* have a per-priority endgame */
614    if( a->pendingRequestCount != b->pendingRequestCount )
615        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
616
617    /* fewer missing pieces goes first */
618    if( a->missingBlockCount != b->missingBlockCount )
619        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
620
621    /* otherwise if one has fewer peers, it goes first */
622    if( a->peerCount != b->peerCount )
623        return a->peerCount < b->peerCount ? -1 : 1;
624
625    /* otherwise go with our random seed */
626    if( a->random != b->random )
627        return a->random < b->random ? -1 : 1;
628
629    return 0;
630}
631
632static tr_piece_index_t *
633getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
634{
635    const tr_torrent  * tor = t->tor;
636    const tr_info     * inf = &tor->info;
637    tr_piece_index_t    i;
638    tr_piece_index_t    poolSize = 0;
639    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
640    int                 peerCount;
641    tr_peer**           peers;
642
643    assert( torrentIsLocked( t ) );
644
645    peers = getConnectedPeers( t, &peerCount );
646
647    /* make a list of the pieces that we want but don't have */
648    for( i = 0; i < inf->pieceCount; ++i )
649        if( !tor->info.pieces[i].dnd
650                && !tr_cpPieceIsComplete( tor->completion, i ) )
651            pool[poolSize++] = i;
652
653    /* sort the pool by which to request next */
654    if( poolSize > 1 )
655    {
656        tr_piece_index_t j;
657        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
658
659        for( j = 0; j < poolSize; ++j )
660        {
661            int k;
662            const tr_piece_index_t piece = pool[j];
663            struct tr_refill_piece * setme = p + j;
664
665            setme->piece = piece;
666            setme->priority = inf->pieces[piece].priority;
667            setme->peerCount = 0;
668            setme->random = tr_cryptoWeakRandInt( INT_MAX );
669            setme->pendingRequestCount = getPieceRequests( t, piece );
670            setme->missingBlockCount
671                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
672
673            for( k = 0; k < peerCount; ++k )
674            {
675                const tr_peer * peer = peers[k];
676                if( peer->peerIsInterested
677                        && !peer->clientIsChoked
678                        && tr_bitfieldHas( peer->have, piece ) )
679                    ++setme->peerCount;
680            }
681        }
682
683        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
684               compareRefillPiece );
685
686        for( j = 0; j < poolSize; ++j )
687            pool[j] = p[j].piece;
688
689        tr_free( p );
690    }
691
692    tr_free( peers );
693
694    *pieceCount = poolSize;
695    return pool;
696}
697
698struct tr_blockIterator
699{
700    Torrent * t;
701    tr_block_index_t blockIndex, blockCount, *blocks;
702    tr_piece_index_t pieceIndex, pieceCount, *pieces;
703};
704
705static struct tr_blockIterator*
706blockIteratorNew( Torrent * t )
707{
708    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
709    i->t = t;
710    i->pieces = getPreferredPieces( t, &i->pieceCount );
711    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
712    return i;
713}
714
715static int
716blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
717{
718    int found;
719    Torrent * t = i->t;
720    tr_torrent * tor = t->tor;
721
722    while( ( i->blockIndex == i->blockCount )
723        && ( i->pieceIndex < i->pieceCount ) )
724    {
725        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
726        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
727        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
728        tr_block_index_t block;
729
730        assert( index < tor->info.pieceCount );
731
732        i->blockCount = 0;
733        i->blockIndex = 0;
734        for( block=b; block!=e; ++block )
735            if( !tr_cpBlockIsComplete( tor->completion, block ) )
736                i->blocks[i->blockCount++] = block;
737    }
738
739    if(( found = ( i->blockIndex < i->blockCount )))
740        *setme = i->blocks[i->blockIndex++];
741
742    return found;
743}
744
745static void
746blockIteratorFree( struct tr_blockIterator * i )
747{
748    tr_free( i->blocks );
749    tr_free( i->pieces );
750    tr_free( i );
751}
752
753static tr_peer**
754getPeersUploadingToClient( Torrent * t,
755                           int *     setmeCount )
756{
757    int j;
758    int peerCount = 0;
759    int retCount = 0;
760    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
761    tr_peer ** ret = tr_new( tr_peer *, peerCount );
762
763    j = 0; /* this is a temporary test to make sure we walk through all the peers */
764    if( peerCount )
765    {
766        /* Get a list of peers we're downloading from.
767           Pick a different starting point each time so all peers
768           get a chance at being the first in line */
769        const int fencepost = tr_cryptoWeakRandInt( peerCount );
770        int i = fencepost;
771        do {
772            if( clientIsDownloadingFrom( peers[i] ) )
773                ret[retCount++] = peers[i];
774            i = ( i + 1 ) % peerCount;
775            ++j;
776        } while( i != fencepost );
777    }
778    assert( j == peerCount );
779    *setmeCount = retCount;
780    return ret;
781}
782
783static uint32_t
784getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
785{
786    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
787    const uint64_t blockPos = tor->blockSize * b;
788    assert( blockPos >= piecePos );
789    return (uint32_t)( blockPos - piecePos );
790}
791
792static int
793refillPulse( void * vtorrent )
794{
795    tr_block_index_t block;
796    int peerCount;
797    int webseedCount;
798    tr_peer ** peers;
799    tr_webseed ** webseeds;
800    struct tr_blockIterator * blockIterator;
801    Torrent * t = vtorrent;
802    tr_torrent * tor = t->tor;
803
804    if( !t->isRunning )
805        return TRUE;
806    if( tr_torrentIsSeed( t->tor ) )
807        return TRUE;
808
809    torrentLock( t );
810    tordbg( t, "Refilling Request Buffers..." );
811
812    blockIterator = blockIteratorNew( t );
813    peers = getPeersUploadingToClient( t, &peerCount );
814    webseedCount = tr_ptrArraySize( &t->webseeds );
815    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
816                          webseedCount * sizeof( tr_webseed* ) );
817
818    while( ( webseedCount || peerCount )
819        && blockIteratorNext( blockIterator, &block ) )
820    {
821        int j;
822        int handled = FALSE;
823
824        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
825        const uint32_t offset = getBlockOffsetInPiece( tor, block );
826        const uint32_t length = tr_torBlockCountBytes( tor, block );
827
828        /* find a peer who can ask for this block */
829        for( j=0; !handled && j<peerCount; )
830        {
831            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
832            switch( val )
833            {
834                case TR_ADDREQ_FULL:
835                case TR_ADDREQ_CLIENT_CHOKED:
836                    peers[j] = peers[--peerCount];
837                    break;
838
839                case TR_ADDREQ_MISSING:
840                case TR_ADDREQ_DUPLICATE:
841                    ++j;
842                    break;
843
844                case TR_ADDREQ_OK:
845                    incrementPieceRequests( t, index );
846                    handled = TRUE;
847                    break;
848
849                default:
850                    assert( 0 && "unhandled value" );
851                    break;
852            }
853        }
854
855        /* maybe one of the webseeds can do it */
856        for( j=0; !handled && j<webseedCount; )
857        {
858            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
859            switch( val )
860            {
861                case TR_ADDREQ_FULL:
862                    webseeds[j] = webseeds[--webseedCount];
863                    break;
864
865                case TR_ADDREQ_OK:
866                    incrementPieceRequests( t, index );
867                    handled = TRUE;
868                    break;
869
870                default:
871                    assert( 0 && "unhandled value" );
872                    break;
873            }
874        }
875    }
876
877    /* cleanup */
878    blockIteratorFree( blockIterator );
879    tr_free( webseeds );
880    tr_free( peers );
881
882    t->refillTimer = NULL;
883    torrentUnlock( t );
884    return FALSE;
885}
886
887static void
888broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
889{
890    int i, size;
891    tr_peer ** peers;
892
893    assert( torrentIsLocked( t ) );
894
895    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
896    peers = getConnectedPeers( t, &size );
897    for( i=0; i<size; ++i )
898        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
899    tr_free( peers );
900}
901
902static void
903addStrike( Torrent * t,
904           tr_peer * peer )
905{
906    tordbg( t, "increasing peer %s strike count to %d",
907            tr_peerIoAddrStr( &peer->addr,
908                              peer->port ), peer->strikes + 1 );
909
910    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
911    {
912        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
913        atom->myflags |= MYFLAG_BANNED;
914        peer->doPurge = 1;
915        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
916    }
917}
918
919static void
920gotBadPiece( Torrent *        t,
921             tr_piece_index_t pieceIndex )
922{
923    tr_torrent *   tor = t->tor;
924    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
925
926    tor->corruptCur += byteCount;
927    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
928}
929
930static void
931refillSoon( Torrent * t )
932{
933    if( t->refillTimer == NULL )
934        t->refillTimer = tr_timerNew( t->manager->session,
935                                      refillPulse, t,
936                                      REFILL_PERIOD_MSEC );
937}
938
939static void
940peerSuggestedPiece( Torrent            * t UNUSED,
941                    tr_peer            * peer UNUSED,
942                    tr_piece_index_t     pieceIndex UNUSED,
943                    int                  isFastAllowed UNUSED )
944{
945#if 0
946    assert( t );
947    assert( peer );
948    assert( peer->msgs );
949
950    /* is this a valid piece? */
951    if(  pieceIndex >= t->tor->info.pieceCount )
952        return;
953
954    /* don't ask for it if we've already got it */
955    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
956        return;
957
958    /* don't ask for it if they don't have it */
959    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
960        return;
961
962    /* don't ask for it if we're choked and it's not fast */
963    if( !isFastAllowed && peer->clientIsChoked )
964        return;
965
966    /* request the blocks that we don't have in this piece */
967    {
968        tr_block_index_t block;
969        const tr_torrent * tor = t->tor;
970        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
971        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
972
973        for( block=start; block<end; ++block )
974        {
975            if( !tr_cpBlockIsComplete( tor->completion, block ) )
976            {
977                const uint32_t offset = getBlockOffsetInPiece( tor, block );
978                const uint32_t length = tr_torBlockCountBytes( tor, block );
979                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
980                incrementPieceRequests( t, pieceIndex );
981            }
982        }
983    }
984#endif
985}
986
987static void
988peerCallbackFunc( void * vpeer, void * vevent, void * vt )
989{
990    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
991    Torrent * t = vt;
992    const tr_peer_event * e = vevent;
993
994    torrentLock( t );
995
996    switch( e->eventType )
997    {
998        case TR_PEER_UPLOAD_ONLY:
999            /* update our atom */
1000            if( peer ) {
1001                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1002                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1003            }
1004            break;
1005
1006        case TR_PEER_NEED_REQ:
1007            refillSoon( t );
1008            break;
1009
1010        case TR_PEER_CANCEL:
1011            decrementPieceRequests( t, e->pieceIndex );
1012            break;
1013
1014        case TR_PEER_PEER_GOT_DATA:
1015        {
1016            const time_t now = time( NULL );
1017            tr_torrent * tor = t->tor;
1018
1019            tor->activityDate = now;
1020
1021            if( e->wasPieceData )
1022                tor->uploadedCur += e->length;
1023
1024            /* update the stats */
1025            if( e->wasPieceData )
1026                tr_statsAddUploaded( tor->session, e->length );
1027
1028            /* update our atom */
1029            if( peer ) {
1030                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1031                if( e->wasPieceData )
1032                    a->piece_data_time = now;
1033            }
1034
1035            break;
1036        }
1037
1038        case TR_PEER_CLIENT_GOT_SUGGEST:
1039            if( peer )
1040                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1041            break;
1042
1043        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1044            if( peer )
1045                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1046            break;
1047
1048        case TR_PEER_CLIENT_GOT_DATA:
1049        {
1050            const time_t now = time( NULL );
1051            tr_torrent * tor = t->tor;
1052            tor->activityDate = now;
1053
1054            /* only add this to downloadedCur if we got it from a peer --
1055             * webseeds shouldn't count against our ratio.  As one tracker
1056             * admin put it, "Those pieces are downloaded directly from the
1057             * content distributor, not the peers, it is the tracker's job
1058             * to manage the swarms, not the web server and does not fit
1059             * into the jurisdiction of the tracker." */
1060            if( peer && e->wasPieceData )
1061                tor->downloadedCur += e->length;
1062
1063            /* update the stats */ 
1064            if( e->wasPieceData )
1065                tr_statsAddDownloaded( tor->session, e->length );
1066
1067            /* update our atom */
1068            if( peer ) {
1069                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1070                if( e->wasPieceData )
1071                    a->piece_data_time = now;
1072            }
1073
1074            break;
1075        }
1076
1077        case TR_PEER_PEER_PROGRESS:
1078        {
1079            if( peer )
1080            {
1081                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1082                const int peerIsSeed = e->progress >= 1.0;
1083                if( peerIsSeed ) {
1084                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1085                    atom->flags |= ADDED_F_SEED_FLAG;
1086                } else {
1087                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1088                    atom->flags &= ~ADDED_F_SEED_FLAG;
1089                }
1090            }
1091            break;
1092        }
1093
1094        case TR_PEER_CLIENT_GOT_BLOCK:
1095        {
1096            tr_torrent *     tor = t->tor;
1097
1098            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1099
1100            tr_cpBlockAdd( tor->completion, block );
1101            decrementPieceRequests( t, e->pieceIndex );
1102
1103            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1104
1105            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1106            {
1107                const tr_piece_index_t p = e->pieceIndex;
1108                const tr_bool ok = tr_ioTestPiece( tor, p );
1109
1110                if( !ok )
1111                {
1112                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1113                               (unsigned long)p );
1114                }
1115
1116                tr_torrentSetHasPiece( tor, p, ok );
1117                tr_torrentSetPieceChecked( tor, p, TRUE );
1118                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1119
1120                if( !ok )
1121                    gotBadPiece( t, p );
1122                else
1123                {
1124                    int i, peerCount;
1125                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1126                    for( i = 0; i < peerCount; ++i )
1127                        tr_peerMsgsHave( peers[i]->msgs, p );
1128                    tr_free( peers );
1129                }
1130
1131                tr_torrentRecheckCompleteness( tor );
1132            }
1133            break;
1134        }
1135
1136        case TR_PEER_ERROR:
1137            if( e->err == EINVAL )
1138            {
1139                addStrike( t, peer );
1140                peer->doPurge = 1;
1141                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1142            }
1143            else if( ( e->err == ERANGE )
1144                  || ( e->err == EMSGSIZE )
1145                  || ( e->err == ENOTCONN ) )
1146            {
1147                /* some protocol error from the peer */
1148                peer->doPurge = 1;
1149                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1150            }
1151            else /* a local error, such as an IO error */
1152            {
1153                t->tor->error = e->err;
1154                tr_strlcpy( t->tor->errorString,
1155                            tr_strerror( t->tor->error ),
1156                            sizeof( t->tor->errorString ) );
1157                tr_torrentStop( t->tor );
1158            }
1159            break;
1160
1161        default:
1162            assert( 0 );
1163    }
1164
1165    torrentUnlock( t );
1166}
1167
1168static void
1169ensureAtomExists( Torrent          * t,
1170                  const tr_address * addr,
1171                  tr_port            port,
1172                  uint8_t            flags,
1173                  uint8_t            from )
1174{
1175    if( getExistingAtom( t, addr ) == NULL )
1176    {
1177        struct peer_atom * a;
1178        a = tr_new0( struct peer_atom, 1 );
1179        a->addr = *addr;
1180        a->port = port;
1181        a->flags = flags;
1182        a->from = from;
1183        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1184        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1185    }
1186}
1187
1188static int
1189getMaxPeerCount( const tr_torrent * tor )
1190{
1191    return tor->maxConnectedPeers;
1192}
1193
1194static int
1195getPeerCount( const Torrent * t )
1196{
1197    return tr_ptrArraySize( &t->peers ) + tr_ptrArraySize( &t->outgoingHandshakes );
1198}
1199
1200/* FIXME: this is kind of a mess. */
1201static tr_bool
1202myHandshakeDoneCB( tr_handshake  * handshake,
1203                   tr_peerIo     * io,
1204                   int             isConnected,
1205                   const uint8_t * peer_id,
1206                   void          * vmanager )
1207{
1208    tr_bool            ok = isConnected;
1209    tr_bool            success = FALSE;
1210    tr_port            port;
1211    const tr_address * addr;
1212    tr_peerMgr       * manager = vmanager;
1213    Torrent          * t;
1214    tr_handshake     * ours;
1215
1216    assert( io );
1217    assert( isConnected == 0 || isConnected == 1 );
1218
1219    t = tr_peerIoHasTorrentHash( io )
1220        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1221        : NULL;
1222
1223    if( tr_peerIoIsIncoming ( io ) )
1224        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1225                                        handshake, handshakeCompare );
1226    else if( t )
1227        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1228                                        handshake, handshakeCompare );
1229    else
1230        ours = handshake;
1231
1232    assert( ours );
1233    assert( ours == handshake );
1234
1235    if( t )
1236        torrentLock( t );
1237
1238    addr = tr_peerIoGetAddress( io, &port );
1239
1240    if( !ok || !t || !t->isRunning )
1241    {
1242        if( t )
1243        {
1244            struct peer_atom * atom = getExistingAtom( t, addr );
1245            if( atom )
1246                ++atom->numFails;
1247        }
1248    }
1249    else /* looking good */
1250    {
1251        struct peer_atom * atom;
1252        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1253        atom = getExistingAtom( t, addr );
1254        atom->time = time( NULL );
1255        atom->piece_data_time = 0;
1256
1257        if( atom->myflags & MYFLAG_BANNED )
1258        {
1259            tordbg( t, "banned peer %s tried to reconnect",
1260                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1261        }
1262        else if( tr_peerIoIsIncoming( io )
1263               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1264
1265        {
1266        }
1267        else
1268        {
1269            tr_peer * peer = getExistingPeer( t, addr );
1270
1271            if( peer ) /* we already have this peer */
1272            {
1273            }
1274            else
1275            {
1276                peer = getPeer( t, addr );
1277                tr_free( peer->client );
1278
1279                if( !peer_id )
1280                    peer->client = NULL;
1281                else {
1282                    char client[128];
1283                    tr_clientForId( client, sizeof( client ), peer_id );
1284                    peer->client = tr_strdup( client );
1285                }
1286
1287                peer->port = port;
1288                peer->io = tr_handshakeStealIO( handshake );
1289                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1290                tr_peerIoSetBandwidth( io, peer->bandwidth );
1291
1292                success = TRUE;
1293            }
1294        }
1295    }
1296
1297    if( !success )
1298        tr_ptrArrayAppend( &manager->finishedHandshakes, handshake );
1299
1300    if( t )
1301        torrentUnlock( t );
1302
1303    return success;
1304}
1305
1306void
1307tr_peerMgrAddIncoming( tr_peerMgr * manager,
1308                       tr_address * addr,
1309                       tr_port      port,
1310                       int          socket )
1311{
1312    managerLock( manager );
1313
1314    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1315    {
1316        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1317        tr_netClose( socket );
1318    }
1319    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1320    {
1321        tr_netClose( socket );
1322    }
1323    else /* we don't have a connetion to them yet... */
1324    {
1325        tr_peerIo *    io;
1326        tr_handshake * handshake;
1327
1328        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1329
1330        handshake = tr_handshakeNew( io,
1331                                     manager->session->encryptionMode,
1332                                     myHandshakeDoneCB,
1333                                     manager );
1334
1335        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1336                                 handshakeCompare );
1337    }
1338
1339    managerUnlock( manager );
1340}
1341
1342static tr_bool
1343tr_isPex( const tr_pex * pex )
1344{
1345    return pex && tr_isAddress( &pex->addr );
1346}
1347
1348void
1349tr_peerMgrAddPex( tr_peerMgr *    manager,
1350                  const uint8_t * torrentHash,
1351                  uint8_t         from,
1352                  const tr_pex *  pex )
1353{
1354    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1355    {
1356        Torrent * t;
1357        managerLock( manager );
1358
1359        t = getExistingTorrent( manager, torrentHash );
1360        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1361            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1362
1363        managerUnlock( manager );
1364    }
1365}
1366
1367tr_pex *
1368tr_peerMgrCompactToPex( const void *    compact,
1369                        size_t          compactLen,
1370                        const uint8_t * added_f,
1371                        size_t          added_f_len,
1372                        size_t *        pexCount )
1373{
1374    size_t          i;
1375    size_t          n = compactLen / 6;
1376    const uint8_t * walk = compact;
1377    tr_pex *        pex = tr_new0( tr_pex, n );
1378
1379    for( i = 0; i < n; ++i )
1380    {
1381        pex[i].addr.type = TR_AF_INET;
1382        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1383        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1384        if( added_f && ( n == added_f_len ) )
1385            pex[i].flags = added_f[i];
1386    }
1387
1388    *pexCount = n;
1389    return pex;
1390}
1391
1392tr_pex *
1393tr_peerMgrCompact6ToPex( const void    * compact,
1394                         size_t          compactLen,
1395                         const uint8_t * added_f,
1396                         size_t          added_f_len,
1397                         size_t        * pexCount )
1398{
1399    size_t          i;
1400    size_t          n = compactLen / 18;
1401    const uint8_t * walk = compact;
1402    tr_pex *        pex = tr_new0( tr_pex, n );
1403   
1404    for( i = 0; i < n; ++i )
1405    {
1406        pex[i].addr.type = TR_AF_INET6;
1407        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1408        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1409        if( added_f && ( n == added_f_len ) )
1410            pex[i].flags = added_f[i];
1411    }
1412   
1413    *pexCount = n;
1414    return pex;
1415}
1416
1417tr_pex *
1418tr_peerMgrArrayToPex( const void * array,
1419                      size_t       arrayLen,
1420                      size_t      * pexCount )
1421{
1422    size_t          i;
1423    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1424    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1425    const uint8_t * walk = array;
1426    tr_pex        * pex = tr_new0( tr_pex, n );
1427   
1428    for( i = 0 ; i < n ; i++ ) {
1429        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1430        tr_suspectAddress( &pex[i].addr, "tracker"  );
1431        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1432        pex[i].flags = 0x00;
1433        walk += sizeof( tr_address ) + 2;
1434    }
1435   
1436    *pexCount = n;
1437    return pex;
1438}
1439
1440/**
1441***
1442**/
1443
1444void
1445tr_peerMgrSetBlame( tr_peerMgr *     manager,
1446                    const uint8_t *  torrentHash,
1447                    tr_piece_index_t pieceIndex,
1448                    int              success )
1449{
1450    if( !success )
1451    {
1452        int        peerCount, i;
1453        Torrent *  t = getExistingTorrent( manager, torrentHash );
1454        tr_peer ** peers;
1455
1456        assert( torrentIsLocked( t ) );
1457
1458        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1459        for( i = 0; i < peerCount; ++i )
1460        {
1461            tr_peer * peer = peers[i];
1462            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1463            {
1464                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1465                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1466                        pieceIndex, (int)peer->strikes + 1 );
1467                addStrike( t, peer );
1468            }
1469        }
1470    }
1471}
1472
1473int
1474tr_pexCompare( const void * va, const void * vb )
1475{
1476    const tr_pex * a = va;
1477    const tr_pex * b = vb;
1478    int i;
1479
1480    assert( tr_isPex( a ) );
1481    assert( tr_isPex( b ) );
1482
1483    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1484        return i;
1485
1486    if( a->port != b->port )
1487        return a->port < b->port ? -1 : 1;
1488
1489    return 0;
1490}
1491
1492static int
1493peerPrefersCrypto( const tr_peer * peer )
1494{
1495    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1496        return TRUE;
1497
1498    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1499        return FALSE;
1500
1501    return tr_peerIoIsEncrypted( peer->io );
1502}
1503
1504int
1505tr_peerMgrGetPeers( tr_peerMgr      * manager,
1506                    const uint8_t   * torrentHash,
1507                    tr_pex         ** setme_pex,
1508                    uint8_t           af)
1509{
1510    int peersReturning = 0;
1511    const Torrent *  t;
1512
1513    managerLock( manager );
1514
1515    t = getExistingTorrent( manager, torrentHash );
1516    if( t == NULL )
1517    {
1518        *setme_pex = NULL;
1519    }
1520    else
1521    {
1522        int i;
1523        const tr_peer ** peers = (const tr_peer**) TR_PTR_ARRAY_DATA( &t->peers );
1524        const int peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1525        /* for now, this will waste memory on torrents that have both
1526         * ipv6 and ipv4 peers */
1527        tr_pex * pex = tr_new( tr_pex, peerCount );
1528        tr_pex * walk = pex;
1529
1530        for( i=0; i<peerCount; ++i )
1531        {
1532            const tr_peer * peer = peers[i];
1533            if( peer->addr.type == af )
1534            {
1535                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1536
1537                assert( tr_isAddress( &peer->addr ) );
1538                walk->addr = peer->addr;
1539                walk->port = peer->port;
1540                walk->flags = 0;
1541                if( peerPrefersCrypto( peer ) )
1542                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1543                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1544                    walk->flags |= ADDED_F_SEED_FLAG;
1545                ++peersReturning;
1546                ++walk;
1547            }
1548        }
1549
1550        assert( ( walk - pex ) == peersReturning );
1551        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1552        *setme_pex = pex;
1553    }
1554
1555    managerUnlock( manager );
1556    return peersReturning;
1557}
1558
1559static int reconnectPulse( void * vtorrent );
1560
1561static int rechokePulse( void * vtorrent );
1562
1563void
1564tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1565                        const uint8_t * torrentHash )
1566{
1567    Torrent * t;
1568
1569    managerLock( manager );
1570
1571    t = getExistingTorrent( manager, torrentHash );
1572
1573    assert( t );
1574    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1575    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1576
1577    if( !t->isRunning )
1578    {
1579        t->isRunning = 1;
1580
1581        t->reconnectTimer = tr_timerNew( t->manager->session,
1582                                         reconnectPulse, t,
1583                                         RECONNECT_PERIOD_MSEC );
1584
1585        t->rechokeTimer = tr_timerNew( t->manager->session,
1586                                       rechokePulse, t,
1587                                       RECHOKE_PERIOD_MSEC );
1588
1589        reconnectPulse( t );
1590
1591        rechokePulse( t );
1592
1593        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1594            refillSoon( t );
1595    }
1596
1597    managerUnlock( manager );
1598}
1599
1600static void
1601stopTorrent( Torrent * t )
1602{
1603    assert( torrentIsLocked( t ) );
1604
1605    t->isRunning = 0;
1606    tr_timerFree( &t->rechokeTimer );
1607    tr_timerFree( &t->reconnectTimer );
1608
1609    /* disconnect the peers. */
1610    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1611    tr_ptrArrayClear( &t->peers );
1612
1613    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1614     * which removes the handshake from t->outgoingHandshakes... */
1615    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1616        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1617}
1618
1619void
1620tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1621                       const uint8_t * torrentHash )
1622{
1623    managerLock( manager );
1624
1625    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1626
1627    managerUnlock( manager );
1628}
1629
1630void
1631tr_peerMgrAddTorrent( tr_peerMgr * manager,
1632                      tr_torrent * tor )
1633{
1634    Torrent * t;
1635
1636    managerLock( manager );
1637
1638    assert( tor );
1639    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1640
1641    t = torrentConstructor( manager, tor );
1642    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1643
1644    managerUnlock( manager );
1645}
1646
1647void
1648tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1649                         const uint8_t * torrentHash )
1650{
1651    Torrent * t;
1652
1653    managerLock( manager );
1654
1655    t = getExistingTorrent( manager, torrentHash );
1656    assert( t );
1657    stopTorrent( t );
1658    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1659    torrentDestructor( t );
1660
1661    managerUnlock( manager );
1662}
1663
1664void
1665tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1666                               const uint8_t *    torrentHash,
1667                               int8_t *           tab,
1668                               unsigned int       tabCount )
1669{
1670    tr_piece_index_t   i;
1671    const Torrent *    t;
1672    const tr_torrent * tor;
1673    float              interval;
1674    tr_bool            isSeed;
1675    int                peerCount;
1676    const tr_peer **   peers;
1677
1678    managerLock( manager );
1679
1680    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1681    tor = t->tor;
1682    interval = tor->info.pieceCount / (float)tabCount;
1683    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1684    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1685    peerCount = TR_PTR_ARRAY_LENGTH( &t->peers );
1686
1687    memset( tab, 0, tabCount );
1688
1689    for( i = 0; tor && i < tabCount; ++i )
1690    {
1691        const int piece = i * interval;
1692
1693        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1694            tab[i] = -1;
1695        else if( peerCount ) {
1696            int j;
1697            for( j = 0; j < peerCount; ++j )
1698                if( tr_bitfieldHas( peers[j]->have, i ) )
1699                    ++tab[i];
1700        }
1701    }
1702
1703    managerUnlock( manager );
1704}
1705
1706/* Returns the pieces that are available from peers */
1707tr_bitfield*
1708tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1709                        const uint8_t *    torrentHash )
1710{
1711    int           i, size;
1712    Torrent *     t;
1713    tr_peer **    peers;
1714    tr_bitfield * pieces;
1715    managerLock( manager );
1716
1717    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1718    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1719    peers = getConnectedPeers( t, &size );
1720    for( i = 0; i < size; ++i )
1721        tr_bitfieldOr( pieces, peers[i]->have );
1722
1723    managerUnlock( manager );
1724    tr_free( peers );
1725    return pieces;
1726}
1727
1728int
1729tr_peerMgrHasConnections( const tr_peerMgr * manager,
1730                          const uint8_t *    torrentHash )
1731{
1732    int ret;
1733    const Torrent * t;
1734    managerLock( manager );
1735
1736    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1737    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1738
1739    managerUnlock( manager );
1740    return ret;
1741}
1742
1743void
1744tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1745                        const uint8_t    * torrentHash,
1746                        int              * setmePeersKnown,
1747                        int              * setmePeersConnected,
1748                        int              * setmeSeedsConnected,
1749                        int              * setmeWebseedsSendingToUs,
1750                        int              * setmePeersSendingToUs,
1751                        int              * setmePeersGettingFromUs,
1752                        int              * setmePeersFrom )
1753{
1754    int i, size;
1755    const Torrent * t;
1756    const tr_peer ** peers;
1757    const tr_webseed ** webseeds;
1758
1759    managerLock( manager );
1760
1761    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1762    peers = (const tr_peer **) TR_PTR_ARRAY_DATA( &t->peers );
1763    size = TR_PTR_ARRAY_LENGTH( &t->peers );
1764
1765    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1766    *setmePeersConnected       = 0;
1767    *setmeSeedsConnected       = 0;
1768    *setmePeersGettingFromUs   = 0;
1769    *setmePeersSendingToUs     = 0;
1770    *setmeWebseedsSendingToUs  = 0;
1771
1772    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1773        setmePeersFrom[i] = 0;
1774
1775    for( i=0; i<size; ++i )
1776    {
1777        const tr_peer * peer = peers[i];
1778        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1779
1780        if( peer->io == NULL ) /* not connected */
1781            continue;
1782
1783        ++*setmePeersConnected;
1784
1785        ++setmePeersFrom[atom->from];
1786
1787        if( clientIsDownloadingFrom( peer ) )
1788            ++*setmePeersSendingToUs;
1789
1790        if( clientIsUploadingTo( peer ) )
1791            ++*setmePeersGettingFromUs;
1792
1793        if( atom->flags & ADDED_F_SEED_FLAG )
1794            ++*setmeSeedsConnected;
1795    }
1796
1797    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1798    size = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1799    for( i=0; i<size; ++i )
1800        if( tr_webseedIsActive( webseeds[i] ) )
1801            ++*setmeWebseedsSendingToUs;
1802
1803    managerUnlock( manager );
1804}
1805
1806float*
1807tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1808                     const uint8_t *    torrentHash )
1809{
1810    const Torrent * t;
1811    const tr_webseed ** webseeds;
1812    int i;
1813    int webseedCount;
1814    float * ret;
1815
1816    assert( manager );
1817    managerLock( manager );
1818
1819    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1820    webseeds = (const tr_webseed**) TR_PTR_ARRAY_DATA( &t->webseeds );
1821    webseedCount = TR_PTR_ARRAY_LENGTH( &t->webseeds );
1822    assert( webseedCount == t->tor->info.webseedCount );
1823    ret = tr_new0( float, webseedCount );
1824
1825    for( i=0; i<webseedCount; ++i )
1826        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1827            ret[i] = -1.0;
1828
1829    managerUnlock( manager );
1830    return ret;
1831}
1832
1833double
1834tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1835{
1836    assert( peer );
1837    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1838
1839    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1840}
1841
1842
1843struct tr_peer_stat *
1844tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1845                     const   uint8_t     * torrentHash,
1846                     int                 * setmeCount UNUSED )
1847{
1848    int             i, size;
1849    const Torrent * t;
1850    tr_peer **      peers;
1851    tr_peer_stat *  ret;
1852
1853    assert( manager );
1854    managerLock( manager );
1855
1856    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1857    peers = getConnectedPeers( (Torrent*)t, &size );
1858    ret = tr_new0( tr_peer_stat, size );
1859
1860    for( i = 0; i < size; ++i )
1861    {
1862        char *                   pch;
1863        const tr_peer *          peer = peers[i];
1864        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1865        tr_peer_stat *           stat = ret + i;
1866        tr_address               norm_addr;
1867
1868        norm_addr = peer->addr;
1869        tr_normalizeV4Mapped( &norm_addr );
1870        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1871        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1872                   sizeof( stat->client ) );
1873        stat->port               = ntohs( peer->port );
1874        stat->from               = atom->from;
1875        stat->progress           = peer->progress;
1876        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1877        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1878        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1879        stat->peerIsChoked       = peer->peerIsChoked;
1880        stat->peerIsInterested   = peer->peerIsInterested;
1881        stat->clientIsChoked     = peer->clientIsChoked;
1882        stat->clientIsInterested = peer->clientIsInterested;
1883        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1884        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1885        stat->isUploadingTo      = clientIsUploadingTo( peer );
1886        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1887
1888        pch = stat->flagStr;
1889        if( t->optimistic == peer ) *pch++ = 'O';
1890        if( stat->isDownloadingFrom ) *pch++ = 'D';
1891        else if( stat->clientIsInterested ) *pch++ = 'd';
1892        if( stat->isUploadingTo ) *pch++ = 'U';
1893        else if( stat->peerIsInterested ) *pch++ = 'u';
1894        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1895        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1896        if( stat->isEncrypted ) *pch++ = 'E';
1897        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1898        if( stat->isIncoming ) *pch++ = 'I';
1899        *pch = '\0';
1900    }
1901
1902    *setmeCount = size;
1903    tr_free( peers );
1904
1905    managerUnlock( manager );
1906    return ret;
1907}
1908
1909/**
1910***
1911**/
1912
1913struct ChokeData
1914{
1915    tr_bool         doUnchoke;
1916    tr_bool         isInterested;
1917    tr_bool         isChoked;
1918    int             rate;
1919    tr_peer *       peer;
1920};
1921
1922static int
1923compareChoke( const void * va,
1924              const void * vb )
1925{
1926    const struct ChokeData * a = va;
1927    const struct ChokeData * b = vb;
1928
1929    if( a->rate != b->rate ) /* prefer higher overall speeds */
1930        return a->rate > b->rate ? -1 : 1;
1931
1932    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1933        return a->isChoked ? 1 : -1;
1934
1935    return 0;
1936}
1937
1938static int
1939isNew( const tr_peer * peer )
1940{
1941    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1942}
1943
1944static int
1945isSame( const tr_peer * peer )
1946{
1947    return peer && peer->client && strstr( peer->client, "Transmission" );
1948}
1949
1950/**
1951***
1952**/
1953
1954static void
1955rechoke( Torrent * t )
1956{
1957    int                i, peerCount, size, unchokedInterested;
1958    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1959    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1960    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1961
1962    assert( torrentIsLocked( t ) );
1963
1964    /* sort the peers by preference and rate */
1965    for( i = 0, size = 0; i < peerCount; ++i )
1966    {
1967        tr_peer * peer = peers[i];
1968        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1969
1970        if( peer->progress >= 1.0 ) /* choke all seeds */
1971        {
1972            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1973        }
1974        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1975        {
1976            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1977        }
1978        else if( chokeAll ) /* choke everyone if we're not uploading */
1979        {
1980            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1981        }
1982        else
1983        {
1984            struct ChokeData * n = &choke[size++];
1985            n->peer         = peer;
1986            n->isInterested = peer->peerIsInterested;
1987            n->isChoked     = peer->peerIsChoked;
1988            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1989        }
1990    }
1991
1992    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1993
1994    /**
1995     * Reciprocation and number of uploads capping is managed by unchoking
1996     * the N peers which have the best upload rate and are interested.
1997     * This maximizes the client's download rate. These N peers are
1998     * referred to as downloaders, because they are interested in downloading
1999     * from the client.
2000     *
2001     * Peers which have a better upload rate (as compared to the downloaders)
2002     * but aren't interested get unchoked. If they become interested, the
2003     * downloader with the worst upload rate gets choked. If a client has
2004     * a complete file, it uses its upload rate rather than its download
2005     * rate to decide which peers to unchoke.
2006     */
2007    unchokedInterested = 0;
2008    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2009        choke[i].doUnchoke = 1;
2010        if( choke[i].isInterested )
2011            ++unchokedInterested;
2012    }
2013
2014    /* optimistic unchoke */
2015    if( i < size )
2016    {
2017        int n;
2018        struct ChokeData * c;
2019        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2020
2021        for( ; i<size; ++i )
2022        {
2023            if( choke[i].isInterested )
2024            {
2025                const tr_peer * peer = choke[i].peer;
2026                int x = 1, y;
2027                if( isNew( peer ) ) x *= 3;
2028                if( isSame( peer ) ) x *= 3;
2029                for( y=0; y<x; ++y )
2030                    tr_ptrArrayAppend( &randPool, &choke[i] );
2031            }
2032        }
2033
2034        if(( n = tr_ptrArraySize( &randPool )))
2035        {
2036            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2037            c->doUnchoke = 1;
2038            t->optimistic = c->peer;
2039        }
2040
2041        tr_ptrArrayDestruct( &randPool, NULL );
2042    }
2043
2044    for( i=0; i<size; ++i )
2045        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2046
2047    /* cleanup */
2048    tr_free( choke );
2049    tr_free( peers );
2050}
2051
2052static int
2053rechokePulse( void * vtorrent )
2054{
2055    Torrent * t = vtorrent;
2056
2057    torrentLock( t );
2058    rechoke( t );
2059    torrentUnlock( t );
2060    return TRUE;
2061}
2062
2063/***
2064****
2065****  Life and Death
2066****
2067***/
2068
2069static int
2070shouldPeerBeClosed( const Torrent * t,
2071                    const tr_peer * peer,
2072                    int             peerCount )
2073{
2074    const tr_torrent *       tor = t->tor;
2075    const time_t             now = time( NULL );
2076    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2077
2078    /* if it's marked for purging, close it */
2079    if( peer->doPurge )
2080    {
2081        tordbg( t, "purging peer %s because its doPurge flag is set",
2082                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2083        return TRUE;
2084    }
2085
2086    /* if we're seeding and the peer has everything we have,
2087     * and enough time has passed for a pex exchange, then disconnect */
2088    if( tr_torrentIsSeed( tor ) )
2089    {
2090        int peerHasEverything;
2091        if( atom->flags & ADDED_F_SEED_FLAG )
2092            peerHasEverything = TRUE;
2093        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2094            peerHasEverything = FALSE;
2095        else {
2096            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2097            tr_bitfieldDifference( tmp, peer->have );
2098            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2099            tr_bitfieldFree( tmp );
2100        }
2101
2102        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2103        {
2104            tordbg( t, "purging peer %s because we're both seeds",
2105                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2106            return TRUE;
2107        }
2108    }
2109
2110    /* disconnect if it's been too long since piece data has been transferred.
2111     * this is on a sliding scale based on number of available peers... */
2112    {
2113        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2114        /* if we have >= relaxIfFewerThan, strictness is 100%.
2115         * if we have zero connections, strictness is 0% */
2116        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2117            ? 1.0
2118            : peerCount / (float)relaxStrictnessIfFewerThanN;
2119        const int lo = MIN_UPLOAD_IDLE_SECS;
2120        const int hi = MAX_UPLOAD_IDLE_SECS;
2121        const int limit = hi - ( ( hi - lo ) * strictness );
2122        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2123/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2124        if( idleTime > limit ) {
2125            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2126                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2127            return TRUE;
2128        }
2129    }
2130
2131    return FALSE;
2132}
2133
2134static tr_peer **
2135getPeersToClose( Torrent * t, int * setmeSize )
2136{
2137    int i, peerCount, outsize;
2138    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2139    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2140
2141    assert( torrentIsLocked( t ) );
2142
2143    for( i = outsize = 0; i < peerCount; ++i )
2144        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2145            ret[outsize++] = peers[i];
2146
2147    *setmeSize = outsize;
2148    return ret;
2149}
2150
2151static int
2152compareCandidates( const void * va,
2153                   const void * vb )
2154{
2155    const struct peer_atom * a = *(const struct peer_atom**) va;
2156    const struct peer_atom * b = *(const struct peer_atom**) vb;
2157
2158    /* <Charles> Here we would probably want to try reconnecting to
2159     * peers that had most recently given us data. Lots of users have
2160     * trouble with resets due to their routers and/or ISPs. This way we
2161     * can quickly recover from an unwanted reset. So we sort
2162     * piece_data_time in descending order.
2163     */
2164
2165    if( a->piece_data_time != b->piece_data_time )
2166        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2167
2168    if( a->numFails != b->numFails )
2169        return a->numFails < b->numFails ? -1 : 1;
2170
2171    if( a->time != b->time )
2172        return a->time < b->time ? -1 : 1;
2173
2174    /* all other things being equal, prefer peers whose
2175     * information comes from a more reliable source */
2176    if( a->from != b->from )
2177        return a->from < b->from ? -1 : 1;
2178
2179    return 0;
2180}
2181
2182static int
2183getReconnectIntervalSecs( const struct peer_atom * atom )
2184{
2185    int          sec;
2186    const time_t now = time( NULL );
2187
2188    /* if we were recently connected to this peer and transferring piece
2189     * data, try to reconnect to them sooner rather that later -- we don't
2190     * want network troubles to get in the way of a good peer. */
2191    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2192        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2193
2194    /* don't allow reconnects more often than our minimum */
2195    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2196        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2197
2198    /* otherwise, the interval depends on how many times we've tried
2199     * and failed to connect to the peer */
2200    else switch( atom->numFails ) {
2201        case 0: sec = 0; break;
2202        case 1: sec = 5; break;
2203        case 2: sec = 2 * 60; break;
2204        case 3: sec = 15 * 60; break;
2205        case 4: sec = 30 * 60; break;
2206        case 5: sec = 60 * 60; break;
2207        default: sec = 120 * 60; break;
2208    }
2209
2210    return sec;
2211}
2212
2213static struct peer_atom **
2214getPeerCandidates( Torrent * t, int * setmeSize )
2215{
2216    int                 i, atomCount, retCount;
2217    struct peer_atom ** atoms;
2218    struct peer_atom ** ret;
2219    const time_t        now = time( NULL );
2220    const int           seed = tr_torrentIsSeed( t->tor );
2221
2222    assert( torrentIsLocked( t ) );
2223
2224    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2225    ret = tr_new( struct peer_atom*, atomCount );
2226    for( i = retCount = 0; i < atomCount; ++i )
2227    {
2228        int                interval;
2229        struct peer_atom * atom = atoms[i];
2230
2231        /* peer fed us too much bad data ... we only keep it around
2232         * now to weed it out in case someone sends it to us via pex */
2233        if( atom->myflags & MYFLAG_BANNED )
2234            continue;
2235
2236        /* peer was unconnectable before, so we're not going to keep trying.
2237         * this is needs a separate flag from `banned', since if they try
2238         * to connect to us later, we'll let them in */
2239        if( atom->myflags & MYFLAG_UNREACHABLE )
2240            continue;
2241
2242        /* we don't need two connections to the same peer... */
2243        if( peerIsInUse( t, &atom->addr ) )
2244            continue;
2245
2246        /* no need to connect if we're both seeds... */
2247        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2248                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2249            continue;
2250
2251        /* don't reconnect too often */
2252        interval = getReconnectIntervalSecs( atom );
2253        if( ( now - atom->time ) < interval )
2254        {
2255            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2256                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2257            continue;
2258        }
2259
2260        /* Don't connect to peers in our blocklist */
2261        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2262            continue;
2263
2264        ret[retCount++] = atom;
2265    }
2266
2267    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2268    *setmeSize = retCount;
2269    return ret;
2270}
2271
2272static int
2273reconnectPulse( void * vtorrent )
2274{
2275    Torrent *     t = vtorrent;
2276    static time_t prevTime = 0;
2277    static int    newConnectionsThisSecond = 0;
2278    time_t        now;
2279
2280    torrentLock( t );
2281
2282    now = time( NULL );
2283    if( prevTime != now )
2284    {
2285        prevTime = now;
2286        newConnectionsThisSecond = 0;
2287    }
2288
2289    if( !t->isRunning )
2290    {
2291        removeAllPeers( t );
2292    }
2293    else
2294    {
2295        int i, nCandidates, nBad;
2296        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2297        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2298
2299        if( nBad || nCandidates )
2300            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2301                    "%d connection candidates, %d atoms, max per pulse is %d",
2302                    t->tor->info.name, nBad, nCandidates,
2303                    tr_ptrArraySize( &t->pool ),
2304                    (int)MAX_RECONNECTIONS_PER_PULSE );
2305
2306        /* disconnect some peers.
2307           if we transferred piece data, then they might be good peers,
2308           so reset their `numFails' weight to zero.  otherwise we connected
2309           to them fruitlessly, so mark it as another fail */
2310        for( i = 0; i < nBad; ++i ) {
2311            tr_peer * peer = connections[i];
2312            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2313            if( atom->piece_data_time )
2314                atom->numFails = 0;
2315            else
2316                ++atom->numFails;
2317            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2318            removePeer( t, peer );
2319        }
2320
2321        /* add some new ones */
2322        for( i = 0;    ( i < nCandidates )
2323           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2324           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2325           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2326        {
2327            tr_peerMgr *       mgr = t->manager;
2328            struct peer_atom * atom = candidates[i];
2329            tr_peerIo *        io;
2330
2331            tordbg( t, "Starting an OUTGOING connection with %s",
2332                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2333
2334            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2335            if( io == NULL )
2336            {
2337                atom->myflags |= MYFLAG_UNREACHABLE;
2338            }
2339            else
2340            {
2341                tr_handshake * handshake = tr_handshakeNew( io,
2342                                                            mgr->session->encryptionMode,
2343                                                            myHandshakeDoneCB,
2344                                                            mgr );
2345
2346                assert( tr_peerIoGetTorrentHash( io ) );
2347
2348                ++newConnectionsThisSecond;
2349
2350                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2351                                         handshakeCompare );
2352            }
2353
2354            atom->time = time( NULL );
2355        }
2356
2357        /* cleanup */
2358        tr_free( connections );
2359        tr_free( candidates );
2360    }
2361
2362    torrentUnlock( t );
2363    return TRUE;
2364}
2365
2366/****
2367*****
2368*****  BANDWIDTH ALLOCATION
2369*****
2370****/
2371
2372static void
2373pumpAllPeers( tr_peerMgr * mgr )
2374{
2375    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2376    int       i, j;
2377
2378    for( i=0; i<torrentCount; ++i )
2379    {
2380        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2381        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2382        {
2383            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2384            tr_peerMsgsPulse( peer->msgs );
2385        }
2386    }
2387}
2388
2389static int
2390bandwidthPulse( void * vmgr )
2391{
2392    tr_handshake * handshake;
2393    tr_peerMgr * mgr = vmgr;
2394    managerLock( mgr );
2395
2396    /* FIXME: this next line probably isn't necessary... */
2397    pumpAllPeers( mgr );
2398
2399    /* allocate bandwidth to the peers */
2400    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2401    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2402
2403    /* free all the finished handshakes */
2404    while(( handshake = tr_ptrArrayPop( &mgr->finishedHandshakes )))
2405        tr_handshakeFree( handshake );
2406
2407    managerUnlock( mgr );
2408    return TRUE;
2409}
Note: See TracBrowser for help on using the repository browser.