source: trunk/libtransmission/peer-mgr.c @ 7456

Last change on this file since 7456 was 7456, checked in by charles, 12 years ago

(trunk libT) minor cleanups found while diffing for backport to 1.4x in r7455

  • Property svn:keywords set to Date Rev Author Id
File size: 67.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7456 2008-12-22 00:52:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 500,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/**
100 * Peer information that should be kept even before we've connected and
101 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
102 * which ones would make good candidates for connecting to, and to watch out
103 * for banned peers.
104 *
105 * @see tr_peer
106 * @see tr_peermsgs
107 */
108struct peer_atom
109{
110    uint8_t     from;
111    uint8_t     flags;       /* these match the added_f flags */
112    uint8_t     myflags;     /* flags that aren't defined in added_f */
113    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
114    tr_port     port;
115    uint16_t    numFails;
116    tr_address  addr;
117    time_t      time;        /* when the peer's connection status last changed */
118    time_t      piece_data_time;
119};
120
121typedef struct
122{
123    tr_bool         isRunning;
124
125    uint8_t         hash[SHA_DIGEST_LENGTH];
126    int         *   pendingRequestCount;
127    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
128    tr_ptrArray *   pool; /* struct peer_atom */
129    tr_ptrArray *   peers; /* tr_peer */
130    tr_ptrArray *   webseeds; /* tr_webseed */
131    tr_timer *      reconnectTimer;
132    tr_timer *      rechokeTimer;
133    tr_timer *      refillTimer;
134    tr_torrent *    tor;
135    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
136
137    struct tr_peerMgr * manager;
138}
139Torrent;
140
141struct tr_peerMgr
142{
143    tr_session      * session;
144    tr_ptrArray     * torrents; /* Torrent */
145    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
146    tr_ptrArray     * finishedHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va, const void * vb )
202{
203    const tr_handshake * a = va;
204
205    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
206}
207
208static int
209handshakeCompare( const void * a, const void * b )
210{
211    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
212}
213
214static tr_handshake*
215getExistingHandshake( tr_ptrArray      * handshakes,
216                      const tr_address * addr )
217{
218    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va, const void * vb )
223{
224    const struct peer_atom * a = va;
225
226    return tr_compareAddresses( &a->addr, vb );
227}
228
229static int
230comparePeerAtoms( const void * va, const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va, const void * vb )
272{
273    const tr_peer * a = va;
274    const tr_peer * b = vb;
275
276    return tr_compareAddresses( &a->addr, &b->addr );
277}
278
279static int
280peerCompareToAddr( const void * va, const void * vb )
281{
282    const tr_peer * a = va;
283
284    return tr_compareAddresses( &a->addr, vb );
285}
286
287static tr_peer*
288getExistingPeer( Torrent          * torrent,
289                 const tr_address * addr )
290{
291    assert( torrentIsLocked( torrent ) );
292    assert( addr );
293
294    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const Torrent    * t,
299                 const tr_address * addr )
300{
301    assert( torrentIsLocked( t ) );
302    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
303}
304
305static tr_bool
306peerIsInUse( const Torrent    * ct,
307             const tr_address * addr )
308{
309    Torrent * t = (Torrent*) ct;
310
311    assert( torrentIsLocked ( t ) );
312
313    return getExistingPeer( t, addr )
314        || getExistingHandshake( t->outgoingHandshakes, addr )
315        || getExistingHandshake( t->manager->incomingHandshakes, addr );
316}
317
318static tr_peer*
319peerConstructor( tr_torrent * tor, const tr_address * addr )
320{
321    tr_peer * p;
322    p = tr_new0( tr_peer, 1 );
323    p->addr = *addr;
324    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent          * torrent,
330         const tr_address * addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( torrent->tor, addr );
341        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351
352    if( peer->msgs != NULL )
353    {
354        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
355        tr_peerMsgsFree( peer->msgs );
356    }
357
358    tr_peerIoFree( peer->io );
359
360    tr_bitfieldFree( peer->have );
361    tr_bitfieldFree( peer->blame );
362    tr_free( peer->client );
363
364    tr_bandwidthFree( peer->bandwidth );
365
366    tr_free( peer );
367}
368
369static void
370removePeer( Torrent * t,
371            tr_peer * peer )
372{
373    tr_peer *          removed;
374    struct peer_atom * atom;
375
376    assert( torrentIsLocked( t ) );
377
378    atom = getExistingAtom( t, &peer->addr );
379    assert( atom );
380    atom->time = time( NULL );
381
382    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
383    assert( removed == peer );
384    peerDestructor( removed );
385}
386
387static void
388removeAllPeers( Torrent * t )
389{
390    while( !tr_ptrArrayEmpty( t->peers ) )
391        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
392}
393
394static void
395torrentDestructor( void * vt )
396{
397    Torrent * t = vt;
398    uint8_t   hash[SHA_DIGEST_LENGTH];
399
400    assert( t );
401    assert( !t->isRunning );
402    assert( t->peers );
403    assert( torrentIsLocked( t ) );
404    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
405    assert( tr_ptrArrayEmpty( t->peers ) );
406
407    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
408
409    tr_timerFree( &t->reconnectTimer );
410    tr_timerFree( &t->rechokeTimer );
411    tr_timerFree( &t->refillTimer );
412
413    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
414    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
415    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
416    tr_ptrArrayFree( t->peers, NULL );
417
418    tr_free( t->pendingRequestCount );
419    tr_free( t );
420}
421
422static void peerCallbackFunc( void * vpeer,
423                              void * vevent,
424                              void * vt );
425
426static Torrent*
427torrentConstructor( tr_peerMgr * manager,
428                    tr_torrent * tor )
429{
430    int       i;
431    Torrent * t;
432
433    t = tr_new0( Torrent, 1 );
434    t->manager = manager;
435    t->tor = tor;
436    t->pool = tr_ptrArrayNew( );
437    t->peers = tr_ptrArrayNew( );
438    t->webseeds = tr_ptrArrayNew( );
439    t->outgoingHandshakes = tr_ptrArrayNew( );
440    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
441
442    for( i = 0; i < tor->info.webseedCount; ++i )
443    {
444        tr_webseed * w =
445            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
446                           t );
447        tr_ptrArrayAppend( t->webseeds, w );
448    }
449
450    return t;
451}
452
453
454static int bandwidthPulse( void * vmgr );
455
456
457tr_peerMgr*
458tr_peerMgrNew( tr_session * session )
459{
460    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
461
462    m->session = session;
463    m->torrents = tr_ptrArrayNew( );
464    m->incomingHandshakes = tr_ptrArrayNew( );
465    m->finishedHandshakes = tr_ptrArrayNew( );
466    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
467    return m;
468}
469
470void
471tr_peerMgrFree( tr_peerMgr * manager )
472{
473    tr_handshake * handshake;
474
475    managerLock( manager );
476
477    tr_timerFree( &manager->bandwidthTimer );
478
479    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
480     * the item from manager->handshakes, so this is a little roundabout... */
481    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
482        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
483
484    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
485
486    while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
487        tr_handshakeFree( handshake );
488
489    tr_ptrArrayFree( manager->finishedHandshakes, NULL );
490
491    /* free the torrents. */
492    tr_ptrArrayFree( manager->torrents, torrentDestructor );
493
494    managerUnlock( manager );
495    tr_free( manager );
496}
497
498static tr_peer**
499getConnectedPeers( Torrent * t, int * setmeCount )
500{
501    int i, peerCount, connectionCount;
502    tr_peer **peers;
503    tr_peer **ret;
504
505    assert( torrentIsLocked( t ) );
506
507    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
508    ret = tr_new( tr_peer *, peerCount );
509
510    for( i = connectionCount = 0; i < peerCount; ++i )
511        if( peers[i]->msgs )
512            ret[connectionCount++] = peers[i];
513
514    *setmeCount = connectionCount;
515    return ret;
516}
517
518static int
519clientIsDownloadingFrom( const tr_peer * peer )
520{
521    return peer->clientIsInterested && !peer->clientIsChoked;
522}
523
524static int
525clientIsUploadingTo( const tr_peer * peer )
526{
527    return peer->peerIsInterested && !peer->peerIsChoked;
528}
529
530/***
531****
532***/
533
534tr_bool
535tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
536                      const uint8_t     * torrentHash,
537                      const tr_address  * addr )
538{
539    tr_bool isSeed = FALSE;
540    const Torrent * t = NULL;
541    const struct peer_atom * atom = NULL;
542
543    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
544    if( t )
545        atom = getExistingAtom( t, addr );
546    if( atom )
547        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
548
549    return isSeed;
550}
551
552/****
553*****
554*****  REFILL
555*****
556****/
557
558static void
559assertValidPiece( Torrent * t, tr_piece_index_t piece )
560{
561    assert( t );
562    assert( t->tor );
563    assert( piece < t->tor->info.pieceCount );
564}
565
566static int
567getPieceRequests( Torrent * t, tr_piece_index_t piece )
568{
569    assertValidPiece( t, piece );
570
571    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
572}
573
574static void
575incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
576{
577    assertValidPiece( t, piece );
578
579    if( t->pendingRequestCount == NULL )
580        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
581    t->pendingRequestCount[piece]++;
582}
583
584static void
585decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
586{
587    assertValidPiece( t, piece );
588
589    if( t->pendingRequestCount )
590        t->pendingRequestCount[piece]--;
591}
592
593struct tr_refill_piece
594{
595    tr_priority_t    priority;
596    uint32_t         piece;
597    uint32_t         peerCount;
598    int              random;
599    int              pendingRequestCount;
600    int              missingBlockCount;
601};
602
603static int
604compareRefillPiece( const void * aIn, const void * bIn )
605{
606    const struct tr_refill_piece * a = aIn;
607    const struct tr_refill_piece * b = bIn;
608
609    /* if one piece has a higher priority, it goes first */
610    if( a->priority != b->priority )
611        return a->priority > b->priority ? -1 : 1;
612
613    /* have a per-priority endgame */
614    if( a->pendingRequestCount != b->pendingRequestCount )
615        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
616
617    /* fewer missing pieces goes first */
618    if( a->missingBlockCount != b->missingBlockCount )
619        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
620
621    /* otherwise if one has fewer peers, it goes first */
622    if( a->peerCount != b->peerCount )
623        return a->peerCount < b->peerCount ? -1 : 1;
624
625    /* otherwise go with our random seed */
626    if( a->random != b->random )
627        return a->random < b->random ? -1 : 1;
628
629    return 0;
630}
631
632static tr_piece_index_t *
633getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
634{
635    const tr_torrent  * tor = t->tor;
636    const tr_info     * inf = &tor->info;
637    tr_piece_index_t    i;
638    tr_piece_index_t    poolSize = 0;
639    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
640    int                 peerCount;
641    tr_peer**           peers;
642
643    assert( torrentIsLocked( t ) );
644
645    peers = getConnectedPeers( t, &peerCount );
646
647    /* make a list of the pieces that we want but don't have */
648    for( i = 0; i < inf->pieceCount; ++i )
649        if( !tor->info.pieces[i].dnd
650                && !tr_cpPieceIsComplete( tor->completion, i ) )
651            pool[poolSize++] = i;
652
653    /* sort the pool by which to request next */
654    if( poolSize > 1 )
655    {
656        tr_piece_index_t j;
657        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
658
659        for( j = 0; j < poolSize; ++j )
660        {
661            int k;
662            const tr_piece_index_t piece = pool[j];
663            struct tr_refill_piece * setme = p + j;
664
665            setme->piece = piece;
666            setme->priority = inf->pieces[piece].priority;
667            setme->peerCount = 0;
668            setme->random = tr_cryptoWeakRandInt( INT_MAX );
669            setme->pendingRequestCount = getPieceRequests( t, piece );
670            setme->missingBlockCount
671                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
672
673            for( k = 0; k < peerCount; ++k )
674            {
675                const tr_peer * peer = peers[k];
676                if( peer->peerIsInterested
677                        && !peer->clientIsChoked
678                        && tr_bitfieldHas( peer->have, piece ) )
679                    ++setme->peerCount;
680            }
681        }
682
683        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
684               compareRefillPiece );
685
686        for( j = 0; j < poolSize; ++j )
687            pool[j] = p[j].piece;
688
689        tr_free( p );
690    }
691
692    tr_free( peers );
693
694    *pieceCount = poolSize;
695    return pool;
696}
697
698struct tr_blockIterator
699{
700    Torrent * t;
701    tr_block_index_t blockIndex, blockCount, *blocks;
702    tr_piece_index_t pieceIndex, pieceCount, *pieces;
703};
704
705static struct tr_blockIterator*
706blockIteratorNew( Torrent * t )
707{
708    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
709    i->t = t;
710    i->pieces = getPreferredPieces( t, &i->pieceCount );
711    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
712    return i;
713}
714
715static int
716blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
717{
718    int found;
719    Torrent * t = i->t;
720    tr_torrent * tor = t->tor;
721
722    while( ( i->blockIndex == i->blockCount )
723        && ( i->pieceIndex < i->pieceCount ) )
724    {
725        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
726        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
727        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
728        tr_block_index_t block;
729
730        assert( index < tor->info.pieceCount );
731
732        i->blockCount = 0;
733        i->blockIndex = 0;
734        for( block=b; block!=e; ++block )
735            if( !tr_cpBlockIsComplete( tor->completion, block ) )
736                i->blocks[i->blockCount++] = block;
737    }
738
739    if(( found = ( i->blockIndex < i->blockCount )))
740        *setme = i->blocks[i->blockIndex++];
741
742    return found;
743}
744
745static void
746blockIteratorFree( struct tr_blockIterator * i )
747{
748    tr_free( i->blocks );
749    tr_free( i->pieces );
750    tr_free( i );
751}
752
753static tr_peer**
754getPeersUploadingToClient( Torrent * t,
755                           int *     setmeCount )
756{
757    int j;
758    int peerCount = 0;
759    int retCount = 0;
760    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
761    tr_peer ** ret = tr_new( tr_peer *, peerCount );
762
763    j = 0; /* this is a temporary test to make sure we walk through all the peers */
764    if( peerCount )
765    {
766        /* Get a list of peers we're downloading from.
767           Pick a different starting point each time so all peers
768           get a chance at being the first in line */
769        const int fencepost = tr_cryptoWeakRandInt( peerCount );
770        int i = fencepost;
771        do {
772            if( clientIsDownloadingFrom( peers[i] ) )
773                ret[retCount++] = peers[i];
774            i = ( i + 1 ) % peerCount;
775            ++j;
776        } while( i != fencepost );
777    }
778    assert( j == peerCount );
779    *setmeCount = retCount;
780    return ret;
781}
782
783static uint32_t
784getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
785{
786    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
787    const uint64_t blockPos = tor->blockSize * b;
788    assert( blockPos >= piecePos );
789    return (uint32_t)( blockPos - piecePos );
790}
791
792static int
793refillPulse( void * vtorrent )
794{
795    tr_block_index_t block;
796    int peerCount;
797    int webseedCount;
798    tr_peer ** peers;
799    tr_webseed ** webseeds;
800    struct tr_blockIterator * blockIterator;
801    Torrent * t = vtorrent;
802    tr_torrent * tor = t->tor;
803
804    if( !t->isRunning )
805        return TRUE;
806    if( tr_torrentIsSeed( t->tor ) )
807        return TRUE;
808
809    torrentLock( t );
810    tordbg( t, "Refilling Request Buffers..." );
811
812    blockIterator = blockIteratorNew( t );
813    peers = getPeersUploadingToClient( t, &peerCount );
814    webseedCount = tr_ptrArraySize( t->webseeds );
815    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
816                          webseedCount * sizeof( tr_webseed* ) );
817
818    while( ( webseedCount || peerCount )
819        && blockIteratorNext( blockIterator, &block ) )
820    {
821        int j;
822        int handled = FALSE;
823
824        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
825        const uint32_t offset = getBlockOffsetInPiece( tor, block );
826        const uint32_t length = tr_torBlockCountBytes( tor, block );
827
828        /* find a peer who can ask for this block */
829        for( j=0; !handled && j<peerCount; )
830        {
831            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
832            switch( val )
833            {
834                case TR_ADDREQ_FULL:
835                case TR_ADDREQ_CLIENT_CHOKED:
836                    peers[j] = peers[--peerCount];
837                    break;
838
839                case TR_ADDREQ_MISSING:
840                case TR_ADDREQ_DUPLICATE:
841                    ++j;
842                    break;
843
844                case TR_ADDREQ_OK:
845                    incrementPieceRequests( t, index );
846                    handled = TRUE;
847                    break;
848
849                default:
850                    assert( 0 && "unhandled value" );
851                    break;
852            }
853        }
854
855        /* maybe one of the webseeds can do it */
856        for( j=0; !handled && j<webseedCount; )
857        {
858            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
859            switch( val )
860            {
861                case TR_ADDREQ_FULL:
862                    webseeds[j] = webseeds[--webseedCount];
863                    break;
864
865                case TR_ADDREQ_OK:
866                    incrementPieceRequests( t, index );
867                    handled = TRUE;
868                    break;
869
870                default:
871                    assert( 0 && "unhandled value" );
872                    break;
873            }
874        }
875    }
876
877    /* cleanup */
878    blockIteratorFree( blockIterator );
879    tr_free( webseeds );
880    tr_free( peers );
881
882    t->refillTimer = NULL;
883    torrentUnlock( t );
884    return FALSE;
885}
886
887static void
888broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
889{
890    int i, size;
891    tr_peer ** peers;
892
893    assert( torrentIsLocked( t ) );
894
895    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
896    peers = getConnectedPeers( t, &size );
897    for( i=0; i<size; ++i )
898        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
899    tr_free( peers );
900}
901
902static void
903addStrike( Torrent * t,
904           tr_peer * peer )
905{
906    tordbg( t, "increasing peer %s strike count to %d",
907            tr_peerIoAddrStr( &peer->addr,
908                              peer->port ), peer->strikes + 1 );
909
910    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
911    {
912        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
913        atom->myflags |= MYFLAG_BANNED;
914        peer->doPurge = 1;
915        tordbg( t, "banning peer %s",
916               tr_peerIoAddrStr( &atom->addr, atom->port ) );
917    }
918}
919
920static void
921gotBadPiece( Torrent *        t,
922             tr_piece_index_t pieceIndex )
923{
924    tr_torrent *   tor = t->tor;
925    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
926
927    tor->corruptCur += byteCount;
928    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
929}
930
931static void
932refillSoon( Torrent * t )
933{
934    if( t->refillTimer == NULL )
935        t->refillTimer = tr_timerNew( t->manager->session,
936                                      refillPulse, t,
937                                      REFILL_PERIOD_MSEC );
938}
939
940static void
941peerSuggestedPiece( Torrent            * t UNUSED,
942                    tr_peer            * peer UNUSED,
943                    tr_piece_index_t     pieceIndex UNUSED,
944                    int                  isFastAllowed UNUSED )
945{
946#if 0
947    assert( t );
948    assert( peer );
949    assert( peer->msgs );
950
951    /* is this a valid piece? */
952    if(  pieceIndex >= t->tor->info.pieceCount )
953        return;
954
955    /* don't ask for it if we've already got it */
956    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
957        return;
958
959    /* don't ask for it if they don't have it */
960    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
961        return;
962
963    /* don't ask for it if we're choked and it's not fast */
964    if( !isFastAllowed && peer->clientIsChoked )
965        return;
966
967    /* request the blocks that we don't have in this piece */
968    {
969        tr_block_index_t block;
970        const tr_torrent * tor = t->tor;
971        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
972        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
973
974        for( block=start; block<end; ++block )
975        {
976            if( !tr_cpBlockIsComplete( tor->completion, block ) )
977            {
978                const uint32_t offset = getBlockOffsetInPiece( tor, block );
979                const uint32_t length = tr_torBlockCountBytes( tor, block );
980                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
981                incrementPieceRequests( t, pieceIndex );
982            }
983        }
984    }
985#endif
986}
987
988static void
989peerCallbackFunc( void * vpeer, void * vevent, void * vt )
990{
991    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
992    Torrent * t = vt;
993    const tr_peer_event * e = vevent;
994
995    torrentLock( t );
996
997    switch( e->eventType )
998    {
999        case TR_PEER_UPLOAD_ONLY:
1000            /* update our atom */
1001            if( peer ) {
1002                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1003                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1004            }
1005            break;
1006
1007        case TR_PEER_NEED_REQ:
1008            refillSoon( t );
1009            break;
1010
1011        case TR_PEER_CANCEL:
1012            decrementPieceRequests( t, e->pieceIndex );
1013            break;
1014
1015        case TR_PEER_PEER_GOT_DATA:
1016        {
1017            const time_t now = time( NULL );
1018            tr_torrent * tor = t->tor;
1019
1020            tor->activityDate = now;
1021
1022            if( e->wasPieceData )
1023                tor->uploadedCur += e->length;
1024
1025            /* update the stats */
1026            if( e->wasPieceData )
1027                tr_statsAddUploaded( tor->session, e->length );
1028
1029            /* update our atom */
1030            if( peer ) {
1031                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1032                if( e->wasPieceData )
1033                    a->piece_data_time = now;
1034            }
1035
1036            break;
1037        }
1038
1039        case TR_PEER_CLIENT_GOT_SUGGEST:
1040            if( peer )
1041                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1042            break;
1043
1044        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1045            if( peer )
1046                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1047            break;
1048
1049        case TR_PEER_CLIENT_GOT_DATA:
1050        {
1051            const time_t now = time( NULL );
1052            tr_torrent * tor = t->tor;
1053            tor->activityDate = now;
1054
1055            /* only add this to downloadedCur if we got it from a peer --
1056             * webseeds shouldn't count against our ratio.  As one tracker
1057             * admin put it, "Those pieces are downloaded directly from the
1058             * content distributor, not the peers, it is the tracker's job
1059             * to manage the swarms, not the web server and does not fit
1060             * into the jurisdiction of the tracker." */
1061            if( peer && e->wasPieceData )
1062                tor->downloadedCur += e->length;
1063
1064            /* update the stats */ 
1065            if( e->wasPieceData )
1066                tr_statsAddDownloaded( tor->session, e->length );
1067
1068            /* update our atom */
1069            if( peer ) {
1070                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1071                if( e->wasPieceData )
1072                    a->piece_data_time = now;
1073            }
1074
1075            break;
1076        }
1077
1078        case TR_PEER_PEER_PROGRESS:
1079        {
1080            if( peer )
1081            {
1082                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1083                const int peerIsSeed = e->progress >= 1.0;
1084                if( peerIsSeed ) {
1085                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1086                    atom->flags |= ADDED_F_SEED_FLAG;
1087                } else {
1088                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1089                    atom->flags &= ~ADDED_F_SEED_FLAG;
1090                }
1091            }
1092            break;
1093        }
1094
1095        case TR_PEER_CLIENT_GOT_BLOCK:
1096        {
1097            tr_torrent *     tor = t->tor;
1098
1099            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1100
1101            tr_cpBlockAdd( tor->completion, block );
1102            decrementPieceRequests( t, e->pieceIndex );
1103
1104            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1105
1106            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1107            {
1108                const tr_piece_index_t p = e->pieceIndex;
1109                const tr_bool ok = tr_ioTestPiece( tor, p );
1110
1111                if( !ok )
1112                {
1113                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1114                               (unsigned long)p );
1115                }
1116
1117                tr_torrentSetHasPiece( tor, p, ok );
1118                tr_torrentSetPieceChecked( tor, p, TRUE );
1119                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1120
1121                if( !ok )
1122                    gotBadPiece( t, p );
1123                else
1124                {
1125                    int i, peerCount;
1126                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1127                    for( i = 0; i < peerCount; ++i )
1128                        tr_peerMsgsHave( peers[i]->msgs, p );
1129                    tr_free( peers );
1130                }
1131
1132                tr_torrentRecheckCompleteness( tor );
1133            }
1134            break;
1135        }
1136
1137        case TR_PEER_ERROR:
1138            if( e->err == EINVAL )
1139            {
1140                addStrike( t, peer );
1141                peer->doPurge = 1;
1142                tordbg( t, "setting doPurge because we got an EINVAL error" );
1143            }
1144            else if( ( e->err == ERANGE )
1145                  || ( e->err == EMSGSIZE )
1146                  || ( e->err == ENOTCONN ) )
1147            {
1148                /* some protocol error from the peer */
1149                peer->doPurge = 1;
1150                tordbg( t, "setting doPurge because we got an ERANGE, EMSGSIZE, or ENOTCONN error" );
1151            }
1152            else /* a local error, such as an IO error */
1153            {
1154                t->tor->error = e->err;
1155                tr_strlcpy( t->tor->errorString,
1156                            tr_strerror( t->tor->error ),
1157                            sizeof( t->tor->errorString ) );
1158                tr_torrentStop( t->tor );
1159            }
1160            break;
1161
1162        default:
1163            assert( 0 );
1164    }
1165
1166    torrentUnlock( t );
1167}
1168
1169static void
1170ensureAtomExists( Torrent          * t,
1171                  const tr_address * addr,
1172                  tr_port            port,
1173                  uint8_t            flags,
1174                  uint8_t            from )
1175{
1176    if( getExistingAtom( t, addr ) == NULL )
1177    {
1178        struct peer_atom * a;
1179        a = tr_new0( struct peer_atom, 1 );
1180        a->addr = *addr;
1181        a->port = port;
1182        a->flags = flags;
1183        a->from = from;
1184        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1185        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1186    }
1187}
1188
1189static int
1190getMaxPeerCount( const tr_torrent * tor )
1191{
1192    return tor->maxConnectedPeers;
1193}
1194
1195static int
1196getPeerCount( const Torrent * t )
1197{
1198    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1199}
1200
1201/* FIXME: this is kind of a mess. */
1202static tr_bool
1203myHandshakeDoneCB( tr_handshake  * handshake,
1204                   tr_peerIo     * io,
1205                   int             isConnected,
1206                   const uint8_t * peer_id,
1207                   void          * vmanager )
1208{
1209    tr_bool            ok = isConnected;
1210    tr_bool            success = FALSE;
1211    tr_port            port;
1212    const tr_address * addr;
1213    tr_peerMgr       * manager = vmanager;
1214    Torrent          * t;
1215    tr_handshake     * ours;
1216
1217    assert( io );
1218    assert( isConnected == 0 || isConnected == 1 );
1219
1220    t = tr_peerIoHasTorrentHash( io )
1221        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1222        : NULL;
1223
1224    if( tr_peerIoIsIncoming ( io ) )
1225        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1226                                        handshake, handshakeCompare );
1227    else if( t )
1228        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1229                                        handshake, handshakeCompare );
1230    else
1231        ours = handshake;
1232
1233    assert( ours );
1234    assert( ours == handshake );
1235
1236    if( t )
1237        torrentLock( t );
1238
1239    addr = tr_peerIoGetAddress( io, &port );
1240
1241    if( !ok || !t || !t->isRunning )
1242    {
1243        if( t )
1244        {
1245            struct peer_atom * atom = getExistingAtom( t, addr );
1246            if( atom )
1247                ++atom->numFails;
1248        }
1249    }
1250    else /* looking good */
1251    {
1252        struct peer_atom * atom;
1253        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1254        atom = getExistingAtom( t, addr );
1255        atom->time = time( NULL );
1256        atom->piece_data_time = 0;
1257
1258        if( atom->myflags & MYFLAG_BANNED )
1259        {
1260            tordbg( t, "banned peer %s tried to reconnect",
1261                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1262        }
1263        else if( tr_peerIoIsIncoming( io )
1264               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1265
1266        {
1267        }
1268        else
1269        {
1270            tr_peer * peer = getExistingPeer( t, addr );
1271
1272            if( peer ) /* we already have this peer */
1273            {
1274            }
1275            else
1276            {
1277                peer = getPeer( t, addr );
1278                tr_free( peer->client );
1279
1280                if( !peer_id )
1281                    peer->client = NULL;
1282                else {
1283                    char client[128];
1284                    tr_clientForId( client, sizeof( client ), peer_id );
1285                    peer->client = tr_strdup( client );
1286                }
1287
1288                peer->port = port;
1289                peer->io = tr_handshakeStealIO( handshake );
1290                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1291                tr_peerIoSetBandwidth( io, peer->bandwidth );
1292
1293                success = TRUE;
1294            }
1295        }
1296    }
1297
1298    if( !success )
1299        tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
1300
1301    if( t )
1302        torrentUnlock( t );
1303
1304    return success;
1305}
1306
1307void
1308tr_peerMgrAddIncoming( tr_peerMgr * manager,
1309                       tr_address * addr,
1310                       tr_port      port,
1311                       int          socket )
1312{
1313    managerLock( manager );
1314
1315    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1316    {
1317        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1318        tr_netClose( socket );
1319    }
1320    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1321    {
1322        tr_netClose( socket );
1323    }
1324    else /* we don't have a connetion to them yet... */
1325    {
1326        tr_peerIo *    io;
1327        tr_handshake * handshake;
1328
1329        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1330
1331        handshake = tr_handshakeNew( io,
1332                                     manager->session->encryptionMode,
1333                                     myHandshakeDoneCB,
1334                                     manager );
1335
1336        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1337                                 handshakeCompare );
1338    }
1339
1340    managerUnlock( manager );
1341}
1342
1343static tr_bool
1344tr_isPex( const tr_pex * pex )
1345{
1346    return pex && tr_isAddress( &pex->addr );
1347}
1348
1349void
1350tr_peerMgrAddPex( tr_peerMgr *    manager,
1351                  const uint8_t * torrentHash,
1352                  uint8_t         from,
1353                  const tr_pex *  pex )
1354{
1355    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1356    {
1357        Torrent * t;
1358        managerLock( manager );
1359
1360        t = getExistingTorrent( manager, torrentHash );
1361        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1362            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1363
1364        managerUnlock( manager );
1365    }
1366}
1367
1368tr_pex *
1369tr_peerMgrCompactToPex( const void *    compact,
1370                        size_t          compactLen,
1371                        const uint8_t * added_f,
1372                        size_t          added_f_len,
1373                        size_t *        pexCount )
1374{
1375    size_t          i;
1376    size_t          n = compactLen / 6;
1377    const uint8_t * walk = compact;
1378    tr_pex *        pex = tr_new0( tr_pex, n );
1379
1380    for( i = 0; i < n; ++i )
1381    {
1382        pex[i].addr.type = TR_AF_INET;
1383        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1384        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1385        if( added_f && ( n == added_f_len ) )
1386            pex[i].flags = added_f[i];
1387    }
1388
1389    *pexCount = n;
1390    return pex;
1391}
1392
1393tr_pex *
1394tr_peerMgrCompact6ToPex( const void    * compact,
1395                         size_t          compactLen,
1396                         const uint8_t * added_f,
1397                         size_t          added_f_len,
1398                         size_t        * pexCount )
1399{
1400    size_t          i;
1401    size_t          n = compactLen / 18;
1402    const uint8_t * walk = compact;
1403    tr_pex *        pex = tr_new0( tr_pex, n );
1404   
1405    for( i = 0; i < n; ++i )
1406    {
1407        pex[i].addr.type = TR_AF_INET6;
1408        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1409        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1410        if( added_f && ( n == added_f_len ) )
1411            pex[i].flags = added_f[i];
1412    }
1413   
1414    *pexCount = n;
1415    return pex;
1416}
1417
1418tr_pex *
1419tr_peerMgrArrayToPex( const void * array,
1420                      size_t       arrayLen,
1421                      size_t      * pexCount )
1422{
1423    size_t          i;
1424    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1425    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1426    const uint8_t * walk = array;
1427    tr_pex        * pex = tr_new0( tr_pex, n );
1428   
1429    for( i = 0 ; i < n ; i++ ) {
1430        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1431        tr_suspectAddress( &pex[i].addr, "tracker"  );
1432        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1433        pex[i].flags = 0x00;
1434        walk += sizeof( tr_address ) + 2;
1435    }
1436   
1437    *pexCount = n;
1438    return pex;
1439}
1440
1441/**
1442***
1443**/
1444
1445void
1446tr_peerMgrSetBlame( tr_peerMgr *     manager,
1447                    const uint8_t *  torrentHash,
1448                    tr_piece_index_t pieceIndex,
1449                    int              success )
1450{
1451    if( !success )
1452    {
1453        int        peerCount, i;
1454        Torrent *  t = getExistingTorrent( manager, torrentHash );
1455        tr_peer ** peers;
1456
1457        assert( torrentIsLocked( t ) );
1458
1459        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1460        for( i = 0; i < peerCount; ++i )
1461        {
1462            tr_peer * peer = peers[i];
1463            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1464            {
1465                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1466                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1467                        pieceIndex, (int)peer->strikes + 1 );
1468                addStrike( t, peer );
1469            }
1470        }
1471    }
1472}
1473
1474int
1475tr_pexCompare( const void * va, const void * vb )
1476{
1477    const tr_pex * a = va;
1478    const tr_pex * b = vb;
1479    int i;
1480
1481    assert( tr_isPex( a ) );
1482    assert( tr_isPex( b ) );
1483
1484    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1485        return i;
1486
1487    if( a->port != b->port )
1488        return a->port < b->port ? -1 : 1;
1489
1490    return 0;
1491}
1492
1493static int
1494peerPrefersCrypto( const tr_peer * peer )
1495{
1496    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1497        return TRUE;
1498
1499    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1500        return FALSE;
1501
1502    return tr_peerIoIsEncrypted( peer->io );
1503}
1504
1505int
1506tr_peerMgrGetPeers( tr_peerMgr      * manager,
1507                    const uint8_t   * torrentHash,
1508                    tr_pex         ** setme_pex,
1509                    uint8_t           af)
1510{
1511    int peerCount = 0;
1512    int peersReturning = 0;
1513    const Torrent *  t;
1514
1515    managerLock( manager );
1516
1517    t = getExistingTorrent( manager, torrentHash );
1518    if( t == NULL )
1519    {
1520        *setme_pex = NULL;
1521    }
1522    else
1523    {
1524        int i;
1525        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1526        /* for now, this will waste memory on torrents that have both
1527         * ipv6 and ipv4 peers */
1528        tr_pex * pex = tr_new( tr_pex, peerCount );
1529        tr_pex * walk = pex;
1530
1531        for( i=0; i<peerCount; ++i )
1532        {
1533            const tr_peer * peer = peers[i];
1534            if( peer->addr.type == af )
1535            {
1536                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1537
1538                assert( tr_isAddress( &peer->addr ) );
1539                walk->addr = peer->addr;
1540                walk->port = peer->port;
1541                walk->flags = 0;
1542                if( peerPrefersCrypto( peer ) )
1543                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1544                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1545                    walk->flags |= ADDED_F_SEED_FLAG;
1546                ++peersReturning;
1547                ++walk;
1548            }
1549        }
1550
1551        assert( ( walk - pex ) == peersReturning );
1552        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1553        *setme_pex = pex;
1554    }
1555
1556    managerUnlock( manager );
1557    return peersReturning;
1558}
1559
1560static int reconnectPulse( void * vtorrent );
1561
1562static int rechokePulse( void * vtorrent );
1563
1564void
1565tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1566                        const uint8_t * torrentHash )
1567{
1568    Torrent * t;
1569
1570    managerLock( manager );
1571
1572    t = getExistingTorrent( manager, torrentHash );
1573
1574    assert( t );
1575    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1576    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1577
1578    if( !t->isRunning )
1579    {
1580        t->isRunning = 1;
1581
1582        t->reconnectTimer = tr_timerNew( t->manager->session,
1583                                         reconnectPulse, t,
1584                                         RECONNECT_PERIOD_MSEC );
1585
1586        t->rechokeTimer = tr_timerNew( t->manager->session,
1587                                       rechokePulse, t,
1588                                       RECHOKE_PERIOD_MSEC );
1589
1590        reconnectPulse( t );
1591
1592        rechokePulse( t );
1593
1594        if( !tr_ptrArrayEmpty( t->webseeds ) )
1595            refillSoon( t );
1596    }
1597
1598    managerUnlock( manager );
1599}
1600
1601static void
1602stopTorrent( Torrent * t )
1603{
1604    assert( torrentIsLocked( t ) );
1605
1606    t->isRunning = 0;
1607    tr_timerFree( &t->rechokeTimer );
1608    tr_timerFree( &t->reconnectTimer );
1609
1610    /* disconnect the peers. */
1611    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1612    tr_ptrArrayClear( t->peers );
1613
1614    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1615     * which removes the handshake from t->outgoingHandshakes... */
1616    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1617        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1618}
1619
1620void
1621tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1622                       const uint8_t * torrentHash )
1623{
1624    managerLock( manager );
1625
1626    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1627
1628    managerUnlock( manager );
1629}
1630
1631void
1632tr_peerMgrAddTorrent( tr_peerMgr * manager,
1633                      tr_torrent * tor )
1634{
1635    Torrent * t;
1636
1637    managerLock( manager );
1638
1639    assert( tor );
1640    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1641
1642    t = torrentConstructor( manager, tor );
1643    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1644
1645    managerUnlock( manager );
1646}
1647
1648void
1649tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1650                         const uint8_t * torrentHash )
1651{
1652    Torrent * t;
1653
1654    managerLock( manager );
1655
1656    t = getExistingTorrent( manager, torrentHash );
1657    assert( t );
1658    stopTorrent( t );
1659    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1660    torrentDestructor( t );
1661
1662    managerUnlock( manager );
1663}
1664
1665void
1666tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1667                               const uint8_t *    torrentHash,
1668                               int8_t *           tab,
1669                               unsigned int       tabCount )
1670{
1671    tr_piece_index_t   i;
1672    const Torrent *    t;
1673    const tr_torrent * tor;
1674    float              interval;
1675    tr_bool            isSeed;
1676    int                peerCount;
1677    const tr_peer **   peers;
1678
1679    managerLock( manager );
1680
1681    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1682    tor = t->tor;
1683    interval = tor->info.pieceCount / (float)tabCount;
1684    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1685    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1686
1687    memset( tab, 0, tabCount );
1688
1689    for( i = 0; tor && i < tabCount; ++i )
1690    {
1691        const int piece = i * interval;
1692
1693        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1694            tab[i] = -1;
1695        else if( peerCount ) {
1696            int j;
1697            for( j = 0; j < peerCount; ++j )
1698                if( tr_bitfieldHas( peers[j]->have, i ) )
1699                    ++tab[i];
1700        }
1701    }
1702
1703    managerUnlock( manager );
1704}
1705
1706/* Returns the pieces that are available from peers */
1707tr_bitfield*
1708tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1709                        const uint8_t *    torrentHash )
1710{
1711    int           i, size;
1712    Torrent *     t;
1713    tr_peer **    peers;
1714    tr_bitfield * pieces;
1715    managerLock( manager );
1716
1717    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1718    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1719    peers = getConnectedPeers( t, &size );
1720    for( i = 0; i < size; ++i )
1721        tr_bitfieldOr( pieces, peers[i]->have );
1722
1723    managerUnlock( manager );
1724    tr_free( peers );
1725    return pieces;
1726}
1727
1728int
1729tr_peerMgrHasConnections( const tr_peerMgr * manager,
1730                          const uint8_t *    torrentHash )
1731{
1732    int ret;
1733    const Torrent * t;
1734    managerLock( manager );
1735
1736    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1737    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1738
1739    managerUnlock( manager );
1740    return ret;
1741}
1742
1743void
1744tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1745                        const uint8_t    * torrentHash,
1746                        int              * setmePeersKnown,
1747                        int              * setmePeersConnected,
1748                        int              * setmeSeedsConnected,
1749                        int              * setmeWebseedsSendingToUs,
1750                        int              * setmePeersSendingToUs,
1751                        int              * setmePeersGettingFromUs,
1752                        int              * setmePeersFrom )
1753{
1754    int i, size;
1755    const Torrent * t;
1756    const tr_peer ** peers;
1757    const tr_webseed ** webseeds;
1758
1759    managerLock( manager );
1760
1761    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1762    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1763
1764    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1765    *setmePeersConnected       = 0;
1766    *setmeSeedsConnected       = 0;
1767    *setmePeersGettingFromUs   = 0;
1768    *setmePeersSendingToUs     = 0;
1769    *setmeWebseedsSendingToUs  = 0;
1770
1771    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1772        setmePeersFrom[i] = 0;
1773
1774    for( i=0; i<size; ++i )
1775    {
1776        const tr_peer * peer = peers[i];
1777        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1778
1779        if( peer->io == NULL ) /* not connected */
1780            continue;
1781
1782        ++*setmePeersConnected;
1783
1784        ++setmePeersFrom[atom->from];
1785
1786        if( clientIsDownloadingFrom( peer ) )
1787            ++*setmePeersSendingToUs;
1788
1789        if( clientIsUploadingTo( peer ) )
1790            ++*setmePeersGettingFromUs;
1791
1792        if( atom->flags & ADDED_F_SEED_FLAG )
1793            ++*setmeSeedsConnected;
1794    }
1795
1796    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1797    for( i=0; i<size; ++i )
1798        if( tr_webseedIsActive( webseeds[i] ) )
1799            ++*setmeWebseedsSendingToUs;
1800
1801    managerUnlock( manager );
1802}
1803
1804float*
1805tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1806                     const uint8_t *    torrentHash )
1807{
1808    const Torrent * t;
1809    const tr_webseed ** webseeds;
1810    int i;
1811    int webseedCount;
1812    float * ret;
1813
1814    assert( manager );
1815    managerLock( manager );
1816
1817    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1818    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1819                                                     &webseedCount );
1820    assert( webseedCount == t->tor->info.webseedCount );
1821    ret = tr_new0( float, webseedCount );
1822
1823    for( i=0; i<webseedCount; ++i )
1824        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1825            ret[i] = -1.0;
1826
1827    managerUnlock( manager );
1828    return ret;
1829}
1830
1831double
1832tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1833{
1834    assert( peer );
1835    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1836
1837    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1838}
1839
1840
1841struct tr_peer_stat *
1842tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1843                     const   uint8_t     * torrentHash,
1844                     int                 * setmeCount UNUSED )
1845{
1846    int             i, size;
1847    const Torrent * t;
1848    tr_peer **      peers;
1849    tr_peer_stat *  ret;
1850
1851    assert( manager );
1852    managerLock( manager );
1853
1854    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1855    peers = getConnectedPeers( (Torrent*)t, &size );
1856    ret = tr_new0( tr_peer_stat, size );
1857
1858    for( i = 0; i < size; ++i )
1859    {
1860        char *                   pch;
1861        const tr_peer *          peer = peers[i];
1862        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1863        tr_peer_stat *           stat = ret + i;
1864        tr_address               norm_addr;
1865
1866        norm_addr = peer->addr;
1867        tr_normalizeV4Mapped( &norm_addr );
1868        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1869        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1870                   sizeof( stat->client ) );
1871        stat->port               = ntohs( peer->port );
1872        stat->from               = atom->from;
1873        stat->progress           = peer->progress;
1874        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1875        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1876        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1877        stat->peerIsChoked       = peer->peerIsChoked;
1878        stat->peerIsInterested   = peer->peerIsInterested;
1879        stat->clientIsChoked     = peer->clientIsChoked;
1880        stat->clientIsInterested = peer->clientIsInterested;
1881        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1882        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1883        stat->isUploadingTo      = clientIsUploadingTo( peer );
1884        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1885
1886        pch = stat->flagStr;
1887        if( t->optimistic == peer ) *pch++ = 'O';
1888        if( stat->isDownloadingFrom ) *pch++ = 'D';
1889        else if( stat->clientIsInterested ) *pch++ = 'd';
1890        if( stat->isUploadingTo ) *pch++ = 'U';
1891        else if( stat->peerIsInterested ) *pch++ = 'u';
1892        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1893        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1894        if( stat->isEncrypted ) *pch++ = 'E';
1895        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1896        if( stat->isIncoming ) *pch++ = 'I';
1897        *pch = '\0';
1898    }
1899
1900    *setmeCount = size;
1901    tr_free( peers );
1902
1903    managerUnlock( manager );
1904    return ret;
1905}
1906
1907/**
1908***
1909**/
1910
1911struct ChokeData
1912{
1913    tr_bool         doUnchoke;
1914    tr_bool         isInterested;
1915    tr_bool         isChoked;
1916    int             rate;
1917    tr_peer *       peer;
1918};
1919
1920static int
1921compareChoke( const void * va,
1922              const void * vb )
1923{
1924    const struct ChokeData * a = va;
1925    const struct ChokeData * b = vb;
1926
1927    if( a->rate != b->rate ) /* prefer higher overall speeds */
1928        return a->rate > b->rate ? -1 : 1;
1929
1930    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1931        return a->isChoked ? 1 : -1;
1932
1933    return 0;
1934}
1935
1936static int
1937isNew( const tr_peer * peer )
1938{
1939    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1940}
1941
1942static int
1943isSame( const tr_peer * peer )
1944{
1945    return peer && peer->client && strstr( peer->client, "Transmission" );
1946}
1947
1948/**
1949***
1950**/
1951
1952static void
1953rechoke( Torrent * t )
1954{
1955    int                i, peerCount, size, unchokedInterested;
1956    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1957    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1958    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1959
1960    assert( torrentIsLocked( t ) );
1961
1962    /* sort the peers by preference and rate */
1963    for( i = 0, size = 0; i < peerCount; ++i )
1964    {
1965        tr_peer * peer = peers[i];
1966        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1967
1968        if( peer->progress >= 1.0 ) /* choke all seeds */
1969        {
1970            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1971        }
1972        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1973        {
1974            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1975        }
1976        else if( chokeAll ) /* choke everyone if we're not uploading */
1977        {
1978            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1979        }
1980        else
1981        {
1982            struct ChokeData * n = &choke[size++];
1983            n->peer         = peer;
1984            n->isInterested = peer->peerIsInterested;
1985            n->isChoked     = peer->peerIsChoked;
1986            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1987        }
1988    }
1989
1990    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1991
1992    /**
1993     * Reciprocation and number of uploads capping is managed by unchoking
1994     * the N peers which have the best upload rate and are interested.
1995     * This maximizes the client's download rate. These N peers are
1996     * referred to as downloaders, because they are interested in downloading
1997     * from the client.
1998     *
1999     * Peers which have a better upload rate (as compared to the downloaders)
2000     * but aren't interested get unchoked. If they become interested, the
2001     * downloader with the worst upload rate gets choked. If a client has
2002     * a complete file, it uses its upload rate rather than its download
2003     * rate to decide which peers to unchoke.
2004     */
2005    unchokedInterested = 0;
2006    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2007        choke[i].doUnchoke = 1;
2008        if( choke[i].isInterested )
2009            ++unchokedInterested;
2010    }
2011
2012    /* optimistic unchoke */
2013    if( i < size )
2014    {
2015        int n;
2016        struct ChokeData * c;
2017        tr_ptrArray * randPool = tr_ptrArrayNew( );
2018
2019        for( ; i<size; ++i )
2020        {
2021            if( choke[i].isInterested )
2022            {
2023                const tr_peer * peer = choke[i].peer;
2024                int x = 1, y;
2025                if( isNew( peer ) ) x *= 3;
2026                if( isSame( peer ) ) x *= 3;
2027                for( y=0; y<x; ++y )
2028                    tr_ptrArrayAppend( randPool, &choke[i] );
2029            }
2030        }
2031
2032        if(( n = tr_ptrArraySize( randPool )))
2033        {
2034            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
2035            c->doUnchoke = 1;
2036            t->optimistic = c->peer;
2037        }
2038
2039        tr_ptrArrayFree( randPool, NULL );
2040    }
2041
2042    for( i=0; i<size; ++i )
2043        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2044
2045    /* cleanup */
2046    tr_free( choke );
2047    tr_free( peers );
2048}
2049
2050static int
2051rechokePulse( void * vtorrent )
2052{
2053    Torrent * t = vtorrent;
2054
2055    torrentLock( t );
2056    rechoke( t );
2057    torrentUnlock( t );
2058    return TRUE;
2059}
2060
2061/***
2062****
2063****  Life and Death
2064****
2065***/
2066
2067static int
2068shouldPeerBeClosed( const Torrent * t,
2069                    const tr_peer * peer,
2070                    int             peerCount )
2071{
2072    const tr_torrent *       tor = t->tor;
2073    const time_t             now = time( NULL );
2074    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2075
2076    /* if it's marked for purging, close it */
2077    if( peer->doPurge )
2078    {
2079        tordbg( t, "purging peer %s because its doPurge flag is set",
2080                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2081        return TRUE;
2082    }
2083
2084    /* if we're seeding and the peer has everything we have,
2085     * and enough time has passed for a pex exchange, then disconnect */
2086    if( tr_torrentIsSeed( tor ) )
2087    {
2088        int peerHasEverything;
2089        if( atom->flags & ADDED_F_SEED_FLAG )
2090            peerHasEverything = TRUE;
2091        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2092            peerHasEverything = FALSE;
2093        else {
2094            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2095            tr_bitfieldDifference( tmp, peer->have );
2096            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2097            tr_bitfieldFree( tmp );
2098        }
2099
2100        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2101        {
2102            tordbg( t, "purging peer %s because we're both seeds",
2103                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2104            return TRUE;
2105        }
2106    }
2107
2108    /* disconnect if it's been too long since piece data has been transferred.
2109     * this is on a sliding scale based on number of available peers... */
2110    {
2111        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2112        /* if we have >= relaxIfFewerThan, strictness is 100%.
2113         * if we have zero connections, strictness is 0% */
2114        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2115            ? 1.0
2116            : peerCount / (float)relaxStrictnessIfFewerThanN;
2117        const int lo = MIN_UPLOAD_IDLE_SECS;
2118        const int hi = MAX_UPLOAD_IDLE_SECS;
2119        const int limit = hi - ( ( hi - lo ) * strictness );
2120        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2121/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2122        if( idleTime > limit ) {
2123            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2124                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2125            return TRUE;
2126        }
2127    }
2128
2129    return FALSE;
2130}
2131
2132static tr_peer **
2133getPeersToClose( Torrent * t, int * setmeSize )
2134{
2135    int i, peerCount, outsize;
2136    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2137    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2138
2139    assert( torrentIsLocked( t ) );
2140
2141    for( i = outsize = 0; i < peerCount; ++i )
2142        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2143            ret[outsize++] = peers[i];
2144
2145    *setmeSize = outsize;
2146    return ret;
2147}
2148
2149static int
2150compareCandidates( const void * va,
2151                   const void * vb )
2152{
2153    const struct peer_atom * a = *(const struct peer_atom**) va;
2154    const struct peer_atom * b = *(const struct peer_atom**) vb;
2155
2156    /* <Charles> Here we would probably want to try reconnecting to
2157     * peers that had most recently given us data. Lots of users have
2158     * trouble with resets due to their routers and/or ISPs. This way we
2159     * can quickly recover from an unwanted reset. So we sort
2160     * piece_data_time in descending order.
2161     */
2162
2163    if( a->piece_data_time != b->piece_data_time )
2164        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2165
2166    if( a->numFails != b->numFails )
2167        return a->numFails < b->numFails ? -1 : 1;
2168
2169    if( a->time != b->time )
2170        return a->time < b->time ? -1 : 1;
2171
2172    /* all other things being equal, prefer peers whose
2173     * information comes from a more reliable source */
2174    if( a->from != b->from )
2175        return a->from < b->from ? -1 : 1;
2176
2177    return 0;
2178}
2179
2180static int
2181getReconnectIntervalSecs( const struct peer_atom * atom )
2182{
2183    int          sec;
2184    const time_t now = time( NULL );
2185
2186    /* if we were recently connected to this peer and transferring piece
2187     * data, try to reconnect to them sooner rather that later -- we don't
2188     * want network troubles to get in the way of a good peer. */
2189    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2190        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2191
2192    /* don't allow reconnects more often than our minimum */
2193    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2194        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2195
2196    /* otherwise, the interval depends on how many times we've tried
2197     * and failed to connect to the peer */
2198    else switch( atom->numFails ) {
2199        case 0: sec = 0; break;
2200        case 1: sec = 5; break;
2201        case 2: sec = 2 * 60; break;
2202        case 3: sec = 15 * 60; break;
2203        case 4: sec = 30 * 60; break;
2204        case 5: sec = 60 * 60; break;
2205        default: sec = 120 * 60; break;
2206    }
2207
2208    return sec;
2209}
2210
2211static struct peer_atom **
2212getPeerCandidates( Torrent * t, int * setmeSize )
2213{
2214    int                 i, atomCount, retCount;
2215    struct peer_atom ** atoms;
2216    struct peer_atom ** ret;
2217    const time_t        now = time( NULL );
2218    const int           seed = tr_torrentIsSeed( t->tor );
2219
2220    assert( torrentIsLocked( t ) );
2221
2222    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2223    ret = tr_new( struct peer_atom*, atomCount );
2224    for( i = retCount = 0; i < atomCount; ++i )
2225    {
2226        int                interval;
2227        struct peer_atom * atom = atoms[i];
2228
2229        /* peer fed us too much bad data ... we only keep it around
2230         * now to weed it out in case someone sends it to us via pex */
2231        if( atom->myflags & MYFLAG_BANNED )
2232            continue;
2233
2234        /* peer was unconnectable before, so we're not going to keep trying.
2235         * this is needs a separate flag from `banned', since if they try
2236         * to connect to us later, we'll let them in */
2237        if( atom->myflags & MYFLAG_UNREACHABLE )
2238            continue;
2239
2240        /* we don't need two connections to the same peer... */
2241        if( peerIsInUse( t, &atom->addr ) )
2242            continue;
2243
2244        /* no need to connect if we're both seeds... */
2245        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2246                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2247            continue;
2248
2249        /* don't reconnect too often */
2250        interval = getReconnectIntervalSecs( atom );
2251        if( ( now - atom->time ) < interval )
2252        {
2253            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2254                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2255            continue;
2256        }
2257
2258        /* Don't connect to peers in our blocklist */
2259        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2260            continue;
2261
2262        ret[retCount++] = atom;
2263    }
2264
2265    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2266    *setmeSize = retCount;
2267    return ret;
2268}
2269
2270static int
2271reconnectPulse( void * vtorrent )
2272{
2273    Torrent *     t = vtorrent;
2274    static time_t prevTime = 0;
2275    static int    newConnectionsThisSecond = 0;
2276    time_t        now;
2277
2278    torrentLock( t );
2279
2280    now = time( NULL );
2281    if( prevTime != now )
2282    {
2283        prevTime = now;
2284        newConnectionsThisSecond = 0;
2285    }
2286
2287    if( !t->isRunning )
2288    {
2289        removeAllPeers( t );
2290    }
2291    else
2292    {
2293        int i, nCandidates, nBad;
2294        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2295        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2296
2297        if( nBad || nCandidates )
2298            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2299                    "%d connection candidates, %d atoms, max per pulse is %d",
2300                    t->tor->info.name, nBad, nCandidates,
2301                    tr_ptrArraySize( t->pool ),
2302                    (int)MAX_RECONNECTIONS_PER_PULSE );
2303
2304        /* disconnect some peers.
2305           if we transferred piece data, then they might be good peers,
2306           so reset their `numFails' weight to zero.  otherwise we connected
2307           to them fruitlessly, so mark it as another fail */
2308        for( i = 0; i < nBad; ++i ) {
2309            tr_peer * peer = connections[i];
2310            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2311            if( atom->piece_data_time )
2312                atom->numFails = 0;
2313            else
2314                ++atom->numFails;
2315            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2316            removePeer( t, peer );
2317        }
2318
2319        /* add some new ones */
2320        for( i = 0;    ( i < nCandidates )
2321           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2322           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2323           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2324        {
2325            tr_peerMgr *       mgr = t->manager;
2326            struct peer_atom * atom = candidates[i];
2327            tr_peerIo *        io;
2328
2329            tordbg( t, "Starting an OUTGOING connection with %s",
2330                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2331
2332            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2333            if( io == NULL )
2334            {
2335                atom->myflags |= MYFLAG_UNREACHABLE;
2336            }
2337            else
2338            {
2339                tr_handshake * handshake = tr_handshakeNew( io,
2340                                                            mgr->session->encryptionMode,
2341                                                            myHandshakeDoneCB,
2342                                                            mgr );
2343
2344                assert( tr_peerIoGetTorrentHash( io ) );
2345
2346                ++newConnectionsThisSecond;
2347
2348                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2349                                         handshakeCompare );
2350            }
2351
2352            atom->time = time( NULL );
2353        }
2354
2355        /* cleanup */
2356        tr_free( connections );
2357        tr_free( candidates );
2358    }
2359
2360    torrentUnlock( t );
2361    return TRUE;
2362}
2363
2364/****
2365*****
2366*****  BANDWIDTH ALLOCATION
2367*****
2368****/
2369
2370static void
2371pumpAllPeers( tr_peerMgr * mgr )
2372{
2373    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2374    int       i, j;
2375
2376    for( i=0; i<torrentCount; ++i )
2377    {
2378        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2379        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2380        {
2381            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2382            tr_peerMsgsPulse( peer->msgs );
2383        }
2384    }
2385}
2386
2387static int
2388bandwidthPulse( void * vmgr )
2389{
2390    tr_handshake * handshake;
2391    tr_peerMgr * mgr = vmgr;
2392    managerLock( mgr );
2393
2394    /* FIXME: this next line probably isn't necessary... */
2395    pumpAllPeers( mgr );
2396
2397    /* allocate bandwidth to the peers */
2398    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2399    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2400
2401    /* free all the finished handshakes */
2402    while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
2403        tr_handshakeFree( handshake );
2404
2405    managerUnlock( mgr );
2406    return TRUE;
2407}
Note: See TracBrowser for help on using the repository browser.