source: trunk/libtransmission/peer-mgr.c @ 7486

Last change on this file since 7486 was 7486, checked in by charles, 12 years ago

(trunk libT) fix connectivity error reported by Stargazer. Also, add more debug statements to track down errors like this in the future

  • Property svn:keywords set to Date Rev Author Id
File size: 67.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7486 2008-12-24 02:50:08Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 4,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 8,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray *   pool; /* struct peer_atom */
130    tr_ptrArray *   peers; /* tr_peer */
131    tr_ptrArray *   webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray     * torrents; /* Torrent */
146    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
147    tr_ptrArray     * finishedHandshakes; /* tr_handshake */
148    tr_timer        * bandwidthTimer;
149};
150
151#define tordbg( t, ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
155    } while( 0 )
156
157#define dbgmsg( ... ) \
158    do { \
159        if( tr_deepLoggingIsActive( ) ) \
160            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
161    } while( 0 )
162
163/**
164***
165**/
166
167static void
168managerLock( const struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->session );
171}
172
173static void
174managerUnlock( const struct tr_peerMgr * manager )
175{
176    tr_globalUnlock( manager->session );
177}
178
179static void
180torrentLock( Torrent * torrent )
181{
182    managerLock( torrent->manager );
183}
184
185static void
186torrentUnlock( Torrent * torrent )
187{
188    managerUnlock( torrent->manager );
189}
190
191static int
192torrentIsLocked( const Torrent * t )
193{
194    return tr_globalIsLocked( t->manager->session );
195}
196
197/**
198***
199**/
200
201static int
202handshakeCompareToAddr( const void * va, const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a, const void * b )
211{
212    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
213}
214
215static tr_handshake*
216getExistingHandshake( tr_ptrArray      * handshakes,
217                      const tr_address * addr )
218{
219    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va, const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return tr_compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va, const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va, const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return tr_compareAddresses( &a->addr, &b->addr );
278}
279
280static int
281peerCompareToAddr( const void * va, const void * vb )
282{
283    const tr_peer * a = va;
284
285    return tr_compareAddresses( &a->addr, vb );
286}
287
288static tr_peer*
289getExistingPeer( Torrent          * torrent,
290                 const tr_address * addr )
291{
292    assert( torrentIsLocked( torrent ) );
293    assert( addr );
294
295    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
296}
297
298static struct peer_atom*
299getExistingAtom( const Torrent    * t,
300                 const tr_address * addr )
301{
302    assert( torrentIsLocked( t ) );
303    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
304}
305
306static tr_bool
307peerIsInUse( const Torrent    * ct,
308             const tr_address * addr )
309{
310    Torrent * t = (Torrent*) ct;
311
312    assert( torrentIsLocked ( t ) );
313
314    return getExistingPeer( t, addr )
315        || getExistingHandshake( t->outgoingHandshakes, addr )
316        || getExistingHandshake( t->manager->incomingHandshakes, addr );
317}
318
319static tr_peer*
320peerConstructor( tr_torrent * tor, const tr_address * addr )
321{
322    tr_peer * p;
323    p = tr_new0( tr_peer, 1 );
324    p->addr = *addr;
325    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
326    return p;
327}
328
329static tr_peer*
330getPeer( Torrent          * torrent,
331         const tr_address * addr )
332{
333    tr_peer * peer;
334
335    assert( torrentIsLocked( torrent ) );
336
337    peer = getExistingPeer( torrent, addr );
338
339    if( peer == NULL )
340    {
341        peer = peerConstructor( torrent->tor, addr );
342        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
343    }
344
345    return peer;
346}
347
348static void
349peerDestructor( tr_peer * peer )
350{
351    assert( peer );
352
353    if( peer->msgs != NULL )
354    {
355        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
356        tr_peerMsgsFree( peer->msgs );
357    }
358
359    tr_peerIoFree( peer->io );
360
361    tr_bitfieldFree( peer->have );
362    tr_bitfieldFree( peer->blame );
363    tr_free( peer->client );
364
365    tr_bandwidthFree( peer->bandwidth );
366
367    tr_free( peer );
368}
369
370static void
371removePeer( Torrent * t,
372            tr_peer * peer )
373{
374    tr_peer *          removed;
375    struct peer_atom * atom;
376
377    assert( torrentIsLocked( t ) );
378
379    atom = getExistingAtom( t, &peer->addr );
380    assert( atom );
381    atom->time = time( NULL );
382
383    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
384    assert( removed == peer );
385    peerDestructor( removed );
386}
387
388static void
389removeAllPeers( Torrent * t )
390{
391    while( !tr_ptrArrayEmpty( t->peers ) )
392        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
393}
394
395static void
396torrentDestructor( void * vt )
397{
398    Torrent * t = vt;
399    uint8_t   hash[SHA_DIGEST_LENGTH];
400
401    assert( t );
402    assert( !t->isRunning );
403    assert( t->peers );
404    assert( torrentIsLocked( t ) );
405    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
406    assert( tr_ptrArrayEmpty( t->peers ) );
407
408    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
409
410    tr_timerFree( &t->reconnectTimer );
411    tr_timerFree( &t->rechokeTimer );
412    tr_timerFree( &t->refillTimer );
413
414    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
417    tr_ptrArrayFree( t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423static void peerCallbackFunc( void * vpeer,
424                              void * vevent,
425                              void * vt );
426
427static Torrent*
428torrentConstructor( tr_peerMgr * manager,
429                    tr_torrent * tor )
430{
431    int       i;
432    Torrent * t;
433
434    t = tr_new0( Torrent, 1 );
435    t->manager = manager;
436    t->tor = tor;
437    t->pool = tr_ptrArrayNew( );
438    t->peers = tr_ptrArrayNew( );
439    t->webseeds = tr_ptrArrayNew( );
440    t->outgoingHandshakes = tr_ptrArrayNew( );
441    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
442
443    for( i = 0; i < tor->info.webseedCount; ++i )
444    {
445        tr_webseed * w =
446            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
447                           t );
448        tr_ptrArrayAppend( t->webseeds, w );
449    }
450
451    return t;
452}
453
454
455static int bandwidthPulse( void * vmgr );
456
457
458tr_peerMgr*
459tr_peerMgrNew( tr_session * session )
460{
461    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
462
463    m->session = session;
464    m->torrents = tr_ptrArrayNew( );
465    m->incomingHandshakes = tr_ptrArrayNew( );
466    m->finishedHandshakes = tr_ptrArrayNew( );
467    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
468    return m;
469}
470
471void
472tr_peerMgrFree( tr_peerMgr * manager )
473{
474    tr_handshake * handshake;
475
476    managerLock( manager );
477
478    tr_timerFree( &manager->bandwidthTimer );
479
480    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
481     * the item from manager->handshakes, so this is a little roundabout... */
482    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
483        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
484
485    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
486
487    while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
488        tr_handshakeFree( handshake );
489
490    tr_ptrArrayFree( manager->finishedHandshakes, NULL );
491
492    /* free the torrents. */
493    tr_ptrArrayFree( manager->torrents, torrentDestructor );
494
495    managerUnlock( manager );
496    tr_free( manager );
497}
498
499static tr_peer**
500getConnectedPeers( Torrent * t, int * setmeCount )
501{
502    int i, peerCount, connectionCount;
503    tr_peer **peers;
504    tr_peer **ret;
505
506    assert( torrentIsLocked( t ) );
507
508    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
509    ret = tr_new( tr_peer *, peerCount );
510
511    for( i = connectionCount = 0; i < peerCount; ++i )
512        if( peers[i]->msgs )
513            ret[connectionCount++] = peers[i];
514
515    *setmeCount = connectionCount;
516    return ret;
517}
518
519static int
520clientIsDownloadingFrom( const tr_peer * peer )
521{
522    return peer->clientIsInterested && !peer->clientIsChoked;
523}
524
525static int
526clientIsUploadingTo( const tr_peer * peer )
527{
528    return peer->peerIsInterested && !peer->peerIsChoked;
529}
530
531/***
532****
533***/
534
535tr_bool
536tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
537                      const uint8_t     * torrentHash,
538                      const tr_address  * addr )
539{
540    tr_bool isSeed = FALSE;
541    const Torrent * t = NULL;
542    const struct peer_atom * atom = NULL;
543
544    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
545    if( t )
546        atom = getExistingAtom( t, addr );
547    if( atom )
548        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
549
550    return isSeed;
551}
552
553/****
554*****
555*****  REFILL
556*****
557****/
558
559static void
560assertValidPiece( Torrent * t, tr_piece_index_t piece )
561{
562    assert( t );
563    assert( t->tor );
564    assert( piece < t->tor->info.pieceCount );
565}
566
567static int
568getPieceRequests( Torrent * t, tr_piece_index_t piece )
569{
570    assertValidPiece( t, piece );
571
572    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
573}
574
575static void
576incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
577{
578    assertValidPiece( t, piece );
579
580    if( t->pendingRequestCount == NULL )
581        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
582    t->pendingRequestCount[piece]++;
583}
584
585static void
586decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
587{
588    assertValidPiece( t, piece );
589
590    if( t->pendingRequestCount )
591        t->pendingRequestCount[piece]--;
592}
593
594struct tr_refill_piece
595{
596    tr_priority_t    priority;
597    uint32_t         piece;
598    uint32_t         peerCount;
599    int              random;
600    int              pendingRequestCount;
601    int              missingBlockCount;
602};
603
604static int
605compareRefillPiece( const void * aIn, const void * bIn )
606{
607    const struct tr_refill_piece * a = aIn;
608    const struct tr_refill_piece * b = bIn;
609
610    /* if one piece has a higher priority, it goes first */
611    if( a->priority != b->priority )
612        return a->priority > b->priority ? -1 : 1;
613
614    /* have a per-priority endgame */
615    if( a->pendingRequestCount != b->pendingRequestCount )
616        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
617
618    /* fewer missing pieces goes first */
619    if( a->missingBlockCount != b->missingBlockCount )
620        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
621
622    /* otherwise if one has fewer peers, it goes first */
623    if( a->peerCount != b->peerCount )
624        return a->peerCount < b->peerCount ? -1 : 1;
625
626    /* otherwise go with our random seed */
627    if( a->random != b->random )
628        return a->random < b->random ? -1 : 1;
629
630    return 0;
631}
632
633static tr_piece_index_t *
634getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
635{
636    const tr_torrent  * tor = t->tor;
637    const tr_info     * inf = &tor->info;
638    tr_piece_index_t    i;
639    tr_piece_index_t    poolSize = 0;
640    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
641    int                 peerCount;
642    tr_peer**           peers;
643
644    assert( torrentIsLocked( t ) );
645
646    peers = getConnectedPeers( t, &peerCount );
647
648    /* make a list of the pieces that we want but don't have */
649    for( i = 0; i < inf->pieceCount; ++i )
650        if( !tor->info.pieces[i].dnd
651                && !tr_cpPieceIsComplete( tor->completion, i ) )
652            pool[poolSize++] = i;
653
654    /* sort the pool by which to request next */
655    if( poolSize > 1 )
656    {
657        tr_piece_index_t j;
658        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
659
660        for( j = 0; j < poolSize; ++j )
661        {
662            int k;
663            const tr_piece_index_t piece = pool[j];
664            struct tr_refill_piece * setme = p + j;
665
666            setme->piece = piece;
667            setme->priority = inf->pieces[piece].priority;
668            setme->peerCount = 0;
669            setme->random = tr_cryptoWeakRandInt( INT_MAX );
670            setme->pendingRequestCount = getPieceRequests( t, piece );
671            setme->missingBlockCount
672                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
673
674            for( k = 0; k < peerCount; ++k )
675            {
676                const tr_peer * peer = peers[k];
677                if( peer->peerIsInterested
678                        && !peer->clientIsChoked
679                        && tr_bitfieldHas( peer->have, piece ) )
680                    ++setme->peerCount;
681            }
682        }
683
684        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
685               compareRefillPiece );
686
687        for( j = 0; j < poolSize; ++j )
688            pool[j] = p[j].piece;
689
690        tr_free( p );
691    }
692
693    tr_free( peers );
694
695    *pieceCount = poolSize;
696    return pool;
697}
698
699struct tr_blockIterator
700{
701    Torrent * t;
702    tr_block_index_t blockIndex, blockCount, *blocks;
703    tr_piece_index_t pieceIndex, pieceCount, *pieces;
704};
705
706static struct tr_blockIterator*
707blockIteratorNew( Torrent * t )
708{
709    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
710    i->t = t;
711    i->pieces = getPreferredPieces( t, &i->pieceCount );
712    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
713    return i;
714}
715
716static int
717blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
718{
719    int found;
720    Torrent * t = i->t;
721    tr_torrent * tor = t->tor;
722
723    while( ( i->blockIndex == i->blockCount )
724        && ( i->pieceIndex < i->pieceCount ) )
725    {
726        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
727        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
728        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
729        tr_block_index_t block;
730
731        assert( index < tor->info.pieceCount );
732
733        i->blockCount = 0;
734        i->blockIndex = 0;
735        for( block=b; block!=e; ++block )
736            if( !tr_cpBlockIsComplete( tor->completion, block ) )
737                i->blocks[i->blockCount++] = block;
738    }
739
740    if(( found = ( i->blockIndex < i->blockCount )))
741        *setme = i->blocks[i->blockIndex++];
742
743    return found;
744}
745
746static void
747blockIteratorFree( struct tr_blockIterator * i )
748{
749    tr_free( i->blocks );
750    tr_free( i->pieces );
751    tr_free( i );
752}
753
754static tr_peer**
755getPeersUploadingToClient( Torrent * t,
756                           int *     setmeCount )
757{
758    int j;
759    int peerCount = 0;
760    int retCount = 0;
761    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
762    tr_peer ** ret = tr_new( tr_peer *, peerCount );
763
764    j = 0; /* this is a temporary test to make sure we walk through all the peers */
765    if( peerCount )
766    {
767        /* Get a list of peers we're downloading from.
768           Pick a different starting point each time so all peers
769           get a chance at being the first in line */
770        const int fencepost = tr_cryptoWeakRandInt( peerCount );
771        int i = fencepost;
772        do {
773            if( clientIsDownloadingFrom( peers[i] ) )
774                ret[retCount++] = peers[i];
775            i = ( i + 1 ) % peerCount;
776            ++j;
777        } while( i != fencepost );
778    }
779    assert( j == peerCount );
780    *setmeCount = retCount;
781    return ret;
782}
783
784static uint32_t
785getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
786{
787    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
788    const uint64_t blockPos = tor->blockSize * b;
789    assert( blockPos >= piecePos );
790    return (uint32_t)( blockPos - piecePos );
791}
792
793static int
794refillPulse( void * vtorrent )
795{
796    tr_block_index_t block;
797    int peerCount;
798    int webseedCount;
799    tr_peer ** peers;
800    tr_webseed ** webseeds;
801    struct tr_blockIterator * blockIterator;
802    Torrent * t = vtorrent;
803    tr_torrent * tor = t->tor;
804
805    if( !t->isRunning )
806        return TRUE;
807    if( tr_torrentIsSeed( t->tor ) )
808        return TRUE;
809
810    torrentLock( t );
811    tordbg( t, "Refilling Request Buffers..." );
812
813    blockIterator = blockIteratorNew( t );
814    peers = getPeersUploadingToClient( t, &peerCount );
815    webseedCount = tr_ptrArraySize( t->webseeds );
816    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
817                          webseedCount * sizeof( tr_webseed* ) );
818
819    while( ( webseedCount || peerCount )
820        && blockIteratorNext( blockIterator, &block ) )
821    {
822        int j;
823        int handled = FALSE;
824
825        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
826        const uint32_t offset = getBlockOffsetInPiece( tor, block );
827        const uint32_t length = tr_torBlockCountBytes( tor, block );
828
829        /* find a peer who can ask for this block */
830        for( j=0; !handled && j<peerCount; )
831        {
832            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
833            switch( val )
834            {
835                case TR_ADDREQ_FULL:
836                case TR_ADDREQ_CLIENT_CHOKED:
837                    peers[j] = peers[--peerCount];
838                    break;
839
840                case TR_ADDREQ_MISSING:
841                case TR_ADDREQ_DUPLICATE:
842                    ++j;
843                    break;
844
845                case TR_ADDREQ_OK:
846                    incrementPieceRequests( t, index );
847                    handled = TRUE;
848                    break;
849
850                default:
851                    assert( 0 && "unhandled value" );
852                    break;
853            }
854        }
855
856        /* maybe one of the webseeds can do it */
857        for( j=0; !handled && j<webseedCount; )
858        {
859            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
860            switch( val )
861            {
862                case TR_ADDREQ_FULL:
863                    webseeds[j] = webseeds[--webseedCount];
864                    break;
865
866                case TR_ADDREQ_OK:
867                    incrementPieceRequests( t, index );
868                    handled = TRUE;
869                    break;
870
871                default:
872                    assert( 0 && "unhandled value" );
873                    break;
874            }
875        }
876    }
877
878    /* cleanup */
879    blockIteratorFree( blockIterator );
880    tr_free( webseeds );
881    tr_free( peers );
882
883    t->refillTimer = NULL;
884    torrentUnlock( t );
885    return FALSE;
886}
887
888static void
889broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
890{
891    int i, size;
892    tr_peer ** peers;
893
894    assert( torrentIsLocked( t ) );
895
896    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
897    peers = getConnectedPeers( t, &size );
898    for( i=0; i<size; ++i )
899        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
900    tr_free( peers );
901}
902
903static void
904addStrike( Torrent * t,
905           tr_peer * peer )
906{
907    tordbg( t, "increasing peer %s strike count to %d",
908            tr_peerIoAddrStr( &peer->addr,
909                              peer->port ), peer->strikes + 1 );
910
911    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
912    {
913        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
914        atom->myflags |= MYFLAG_BANNED;
915        peer->doPurge = 1;
916        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
917    }
918}
919
920static void
921gotBadPiece( Torrent *        t,
922             tr_piece_index_t pieceIndex )
923{
924    tr_torrent *   tor = t->tor;
925    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
926
927    tor->corruptCur += byteCount;
928    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
929}
930
931static void
932refillSoon( Torrent * t )
933{
934    if( t->refillTimer == NULL )
935        t->refillTimer = tr_timerNew( t->manager->session,
936                                      refillPulse, t,
937                                      REFILL_PERIOD_MSEC );
938}
939
940static void
941peerSuggestedPiece( Torrent            * t UNUSED,
942                    tr_peer            * peer UNUSED,
943                    tr_piece_index_t     pieceIndex UNUSED,
944                    int                  isFastAllowed UNUSED )
945{
946#if 0
947    assert( t );
948    assert( peer );
949    assert( peer->msgs );
950
951    /* is this a valid piece? */
952    if(  pieceIndex >= t->tor->info.pieceCount )
953        return;
954
955    /* don't ask for it if we've already got it */
956    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
957        return;
958
959    /* don't ask for it if they don't have it */
960    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
961        return;
962
963    /* don't ask for it if we're choked and it's not fast */
964    if( !isFastAllowed && peer->clientIsChoked )
965        return;
966
967    /* request the blocks that we don't have in this piece */
968    {
969        tr_block_index_t block;
970        const tr_torrent * tor = t->tor;
971        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
972        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
973
974        for( block=start; block<end; ++block )
975        {
976            if( !tr_cpBlockIsComplete( tor->completion, block ) )
977            {
978                const uint32_t offset = getBlockOffsetInPiece( tor, block );
979                const uint32_t length = tr_torBlockCountBytes( tor, block );
980                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
981                incrementPieceRequests( t, pieceIndex );
982            }
983        }
984    }
985#endif
986}
987
988static void
989peerCallbackFunc( void * vpeer, void * vevent, void * vt )
990{
991    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
992    Torrent * t = vt;
993    const tr_peer_event * e = vevent;
994
995    torrentLock( t );
996
997    switch( e->eventType )
998    {
999        case TR_PEER_UPLOAD_ONLY:
1000            /* update our atom */
1001            if( peer ) {
1002                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1003                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1004            }
1005            break;
1006
1007        case TR_PEER_NEED_REQ:
1008            refillSoon( t );
1009            break;
1010
1011        case TR_PEER_CANCEL:
1012            decrementPieceRequests( t, e->pieceIndex );
1013            break;
1014
1015        case TR_PEER_PEER_GOT_DATA:
1016        {
1017            const time_t now = time( NULL );
1018            tr_torrent * tor = t->tor;
1019
1020            tor->activityDate = now;
1021
1022            if( e->wasPieceData )
1023                tor->uploadedCur += e->length;
1024
1025            /* update the stats */
1026            if( e->wasPieceData )
1027                tr_statsAddUploaded( tor->session, e->length );
1028
1029            /* update our atom */
1030            if( peer ) {
1031                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1032                if( e->wasPieceData )
1033                    a->piece_data_time = now;
1034            }
1035
1036            break;
1037        }
1038
1039        case TR_PEER_CLIENT_GOT_SUGGEST:
1040            if( peer )
1041                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1042            break;
1043
1044        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1045            if( peer )
1046                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1047            break;
1048
1049        case TR_PEER_CLIENT_GOT_DATA:
1050        {
1051            const time_t now = time( NULL );
1052            tr_torrent * tor = t->tor;
1053            tor->activityDate = now;
1054
1055            /* only add this to downloadedCur if we got it from a peer --
1056             * webseeds shouldn't count against our ratio.  As one tracker
1057             * admin put it, "Those pieces are downloaded directly from the
1058             * content distributor, not the peers, it is the tracker's job
1059             * to manage the swarms, not the web server and does not fit
1060             * into the jurisdiction of the tracker." */
1061            if( peer && e->wasPieceData )
1062                tor->downloadedCur += e->length;
1063
1064            /* update the stats */ 
1065            if( e->wasPieceData )
1066                tr_statsAddDownloaded( tor->session, e->length );
1067
1068            /* update our atom */
1069            if( peer ) {
1070                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1071                if( e->wasPieceData )
1072                    a->piece_data_time = now;
1073            }
1074
1075            break;
1076        }
1077
1078        case TR_PEER_PEER_PROGRESS:
1079        {
1080            if( peer )
1081            {
1082                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1083                const int peerIsSeed = e->progress >= 1.0;
1084                if( peerIsSeed ) {
1085                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1086                    atom->flags |= ADDED_F_SEED_FLAG;
1087                } else {
1088                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1089                    atom->flags &= ~ADDED_F_SEED_FLAG;
1090                }
1091            }
1092            break;
1093        }
1094
1095        case TR_PEER_CLIENT_GOT_BLOCK:
1096        {
1097            tr_torrent *     tor = t->tor;
1098
1099            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1100
1101            tr_cpBlockAdd( tor->completion, block );
1102            decrementPieceRequests( t, e->pieceIndex );
1103
1104            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1105
1106            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1107            {
1108                const tr_piece_index_t p = e->pieceIndex;
1109                const tr_bool ok = tr_ioTestPiece( tor, p );
1110
1111                if( !ok )
1112                {
1113                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1114                               (unsigned long)p );
1115                }
1116
1117                tr_torrentSetHasPiece( tor, p, ok );
1118                tr_torrentSetPieceChecked( tor, p, TRUE );
1119                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1120
1121                if( !ok )
1122                    gotBadPiece( t, p );
1123                else
1124                {
1125                    int i, peerCount;
1126                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1127                    for( i = 0; i < peerCount; ++i )
1128                        tr_peerMsgsHave( peers[i]->msgs, p );
1129                    tr_free( peers );
1130                }
1131
1132                tr_torrentRecheckCompleteness( tor );
1133            }
1134            break;
1135        }
1136
1137        case TR_PEER_ERROR:
1138            if( e->err == EINVAL )
1139            {
1140                addStrike( t, peer );
1141                peer->doPurge = 1;
1142                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1143            }
1144            else if( ( e->err == ERANGE )
1145                  || ( e->err == EMSGSIZE )
1146                  || ( e->err == ENOTCONN ) )
1147            {
1148                /* some protocol error from the peer */
1149                peer->doPurge = 1;
1150                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1151            }
1152            else /* a local error, such as an IO error */
1153            {
1154                t->tor->error = e->err;
1155                tr_strlcpy( t->tor->errorString,
1156                            tr_strerror( t->tor->error ),
1157                            sizeof( t->tor->errorString ) );
1158                tr_torrentStop( t->tor );
1159            }
1160            break;
1161
1162        default:
1163            assert( 0 );
1164    }
1165
1166    torrentUnlock( t );
1167}
1168
1169static void
1170ensureAtomExists( Torrent          * t,
1171                  const tr_address * addr,
1172                  tr_port            port,
1173                  uint8_t            flags,
1174                  uint8_t            from )
1175{
1176    if( getExistingAtom( t, addr ) == NULL )
1177    {
1178        struct peer_atom * a;
1179        a = tr_new0( struct peer_atom, 1 );
1180        a->addr = *addr;
1181        a->port = port;
1182        a->flags = flags;
1183        a->from = from;
1184        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1185        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1186    }
1187}
1188
1189static int
1190getMaxPeerCount( const tr_torrent * tor )
1191{
1192    return tor->maxConnectedPeers;
1193}
1194
1195static int
1196getPeerCount( const Torrent * t )
1197{
1198    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1199}
1200
1201/* FIXME: this is kind of a mess. */
1202static tr_bool
1203myHandshakeDoneCB( tr_handshake  * handshake,
1204                   tr_peerIo     * io,
1205                   int             isConnected,
1206                   const uint8_t * peer_id,
1207                   void          * vmanager )
1208{
1209    tr_bool            ok = isConnected;
1210    tr_bool            success = FALSE;
1211    tr_port            port;
1212    const tr_address * addr;
1213    tr_peerMgr       * manager = vmanager;
1214    Torrent          * t;
1215    tr_handshake     * ours;
1216
1217    assert( io );
1218    assert( isConnected == 0 || isConnected == 1 );
1219
1220    t = tr_peerIoHasTorrentHash( io )
1221        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1222        : NULL;
1223
1224    if( tr_peerIoIsIncoming ( io ) )
1225        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1226                                        handshake, handshakeCompare );
1227    else if( t )
1228        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1229                                        handshake, handshakeCompare );
1230    else
1231        ours = handshake;
1232
1233    assert( ours );
1234    assert( ours == handshake );
1235
1236    if( t )
1237        torrentLock( t );
1238
1239    addr = tr_peerIoGetAddress( io, &port );
1240
1241    if( !ok || !t || !t->isRunning )
1242    {
1243        if( t )
1244        {
1245            struct peer_atom * atom = getExistingAtom( t, addr );
1246            if( atom )
1247                ++atom->numFails;
1248        }
1249    }
1250    else /* looking good */
1251    {
1252        struct peer_atom * atom;
1253        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1254        atom = getExistingAtom( t, addr );
1255        atom->time = time( NULL );
1256        atom->piece_data_time = 0;
1257
1258        if( atom->myflags & MYFLAG_BANNED )
1259        {
1260            tordbg( t, "banned peer %s tried to reconnect",
1261                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1262        }
1263        else if( tr_peerIoIsIncoming( io )
1264               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1265
1266        {
1267        }
1268        else
1269        {
1270            tr_peer * peer = getExistingPeer( t, addr );
1271
1272            if( peer ) /* we already have this peer */
1273            {
1274            }
1275            else
1276            {
1277                peer = getPeer( t, addr );
1278                tr_free( peer->client );
1279
1280                if( !peer_id )
1281                    peer->client = NULL;
1282                else {
1283                    char client[128];
1284                    tr_clientForId( client, sizeof( client ), peer_id );
1285                    peer->client = tr_strdup( client );
1286                }
1287
1288                peer->port = port;
1289                peer->io = tr_handshakeStealIO( handshake );
1290                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1291                tr_peerIoSetBandwidth( io, peer->bandwidth );
1292
1293                success = TRUE;
1294            }
1295        }
1296    }
1297
1298    if( !success )
1299        tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
1300
1301    if( t )
1302        torrentUnlock( t );
1303
1304    return success;
1305}
1306
1307void
1308tr_peerMgrAddIncoming( tr_peerMgr * manager,
1309                       tr_address * addr,
1310                       tr_port      port,
1311                       int          socket )
1312{
1313    managerLock( manager );
1314
1315    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1316    {
1317        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1318        tr_netClose( socket );
1319    }
1320    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1321    {
1322        tr_netClose( socket );
1323    }
1324    else /* we don't have a connetion to them yet... */
1325    {
1326        tr_peerIo *    io;
1327        tr_handshake * handshake;
1328
1329        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1330
1331        handshake = tr_handshakeNew( io,
1332                                     manager->session->encryptionMode,
1333                                     myHandshakeDoneCB,
1334                                     manager );
1335
1336        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1337                                 handshakeCompare );
1338    }
1339
1340    managerUnlock( manager );
1341}
1342
1343static tr_bool
1344tr_isPex( const tr_pex * pex )
1345{
1346    return pex && tr_isAddress( &pex->addr );
1347}
1348
1349void
1350tr_peerMgrAddPex( tr_peerMgr *    manager,
1351                  const uint8_t * torrentHash,
1352                  uint8_t         from,
1353                  const tr_pex *  pex )
1354{
1355    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1356    {
1357        Torrent * t;
1358        managerLock( manager );
1359
1360        t = getExistingTorrent( manager, torrentHash );
1361        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1362            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1363
1364        managerUnlock( manager );
1365    }
1366}
1367
1368tr_pex *
1369tr_peerMgrCompactToPex( const void *    compact,
1370                        size_t          compactLen,
1371                        const uint8_t * added_f,
1372                        size_t          added_f_len,
1373                        size_t *        pexCount )
1374{
1375    size_t          i;
1376    size_t          n = compactLen / 6;
1377    const uint8_t * walk = compact;
1378    tr_pex *        pex = tr_new0( tr_pex, n );
1379
1380    for( i = 0; i < n; ++i )
1381    {
1382        pex[i].addr.type = TR_AF_INET;
1383        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1384        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1385        if( added_f && ( n == added_f_len ) )
1386            pex[i].flags = added_f[i];
1387    }
1388
1389    *pexCount = n;
1390    return pex;
1391}
1392
1393tr_pex *
1394tr_peerMgrCompact6ToPex( const void    * compact,
1395                         size_t          compactLen,
1396                         const uint8_t * added_f,
1397                         size_t          added_f_len,
1398                         size_t        * pexCount )
1399{
1400    size_t          i;
1401    size_t          n = compactLen / 18;
1402    const uint8_t * walk = compact;
1403    tr_pex *        pex = tr_new0( tr_pex, n );
1404   
1405    for( i = 0; i < n; ++i )
1406    {
1407        pex[i].addr.type = TR_AF_INET6;
1408        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1409        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1410        if( added_f && ( n == added_f_len ) )
1411            pex[i].flags = added_f[i];
1412    }
1413   
1414    *pexCount = n;
1415    return pex;
1416}
1417
1418tr_pex *
1419tr_peerMgrArrayToPex( const void * array,
1420                      size_t       arrayLen,
1421                      size_t      * pexCount )
1422{
1423    size_t          i;
1424    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1425    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1426    const uint8_t * walk = array;
1427    tr_pex        * pex = tr_new0( tr_pex, n );
1428   
1429    for( i = 0 ; i < n ; i++ ) {
1430        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1431        tr_suspectAddress( &pex[i].addr, "tracker"  );
1432        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1433        pex[i].flags = 0x00;
1434        walk += sizeof( tr_address ) + 2;
1435    }
1436   
1437    *pexCount = n;
1438    return pex;
1439}
1440
1441/**
1442***
1443**/
1444
1445void
1446tr_peerMgrSetBlame( tr_peerMgr *     manager,
1447                    const uint8_t *  torrentHash,
1448                    tr_piece_index_t pieceIndex,
1449                    int              success )
1450{
1451    if( !success )
1452    {
1453        int        peerCount, i;
1454        Torrent *  t = getExistingTorrent( manager, torrentHash );
1455        tr_peer ** peers;
1456
1457        assert( torrentIsLocked( t ) );
1458
1459        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1460        for( i = 0; i < peerCount; ++i )
1461        {
1462            tr_peer * peer = peers[i];
1463            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1464            {
1465                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1466                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1467                        pieceIndex, (int)peer->strikes + 1 );
1468                addStrike( t, peer );
1469            }
1470        }
1471    }
1472}
1473
1474int
1475tr_pexCompare( const void * va, const void * vb )
1476{
1477    const tr_pex * a = va;
1478    const tr_pex * b = vb;
1479    int i;
1480
1481    assert( tr_isPex( a ) );
1482    assert( tr_isPex( b ) );
1483
1484    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1485        return i;
1486
1487    if( a->port != b->port )
1488        return a->port < b->port ? -1 : 1;
1489
1490    return 0;
1491}
1492
1493static int
1494peerPrefersCrypto( const tr_peer * peer )
1495{
1496    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1497        return TRUE;
1498
1499    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1500        return FALSE;
1501
1502    return tr_peerIoIsEncrypted( peer->io );
1503}
1504
1505int
1506tr_peerMgrGetPeers( tr_peerMgr      * manager,
1507                    const uint8_t   * torrentHash,
1508                    tr_pex         ** setme_pex,
1509                    uint8_t           af)
1510{
1511    int peerCount = 0;
1512    int peersReturning = 0;
1513    const Torrent *  t;
1514
1515    managerLock( manager );
1516
1517    t = getExistingTorrent( manager, torrentHash );
1518    if( t == NULL )
1519    {
1520        *setme_pex = NULL;
1521    }
1522    else
1523    {
1524        int i;
1525        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1526        /* for now, this will waste memory on torrents that have both
1527         * ipv6 and ipv4 peers */
1528        tr_pex * pex = tr_new( tr_pex, peerCount );
1529        tr_pex * walk = pex;
1530
1531        for( i=0; i<peerCount; ++i )
1532        {
1533            const tr_peer * peer = peers[i];
1534            if( peer->addr.type == af )
1535            {
1536                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1537
1538                assert( tr_isAddress( &peer->addr ) );
1539                walk->addr = peer->addr;
1540                walk->port = peer->port;
1541                walk->flags = 0;
1542                if( peerPrefersCrypto( peer ) )
1543                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1544                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1545                    walk->flags |= ADDED_F_SEED_FLAG;
1546                ++peersReturning;
1547                ++walk;
1548            }
1549        }
1550
1551        assert( ( walk - pex ) == peersReturning );
1552        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1553        *setme_pex = pex;
1554    }
1555
1556    managerUnlock( manager );
1557    return peersReturning;
1558}
1559
1560static int reconnectPulse( void * vtorrent );
1561
1562static int rechokePulse( void * vtorrent );
1563
1564void
1565tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1566                        const uint8_t * torrentHash )
1567{
1568    Torrent * t;
1569
1570    managerLock( manager );
1571
1572    t = getExistingTorrent( manager, torrentHash );
1573
1574    assert( t );
1575    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1576    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1577
1578    if( !t->isRunning )
1579    {
1580        t->isRunning = 1;
1581
1582        t->reconnectTimer = tr_timerNew( t->manager->session,
1583                                         reconnectPulse, t,
1584                                         RECONNECT_PERIOD_MSEC );
1585
1586        t->rechokeTimer = tr_timerNew( t->manager->session,
1587                                       rechokePulse, t,
1588                                       RECHOKE_PERIOD_MSEC );
1589
1590        reconnectPulse( t );
1591
1592        rechokePulse( t );
1593
1594        if( !tr_ptrArrayEmpty( t->webseeds ) )
1595            refillSoon( t );
1596    }
1597
1598    managerUnlock( manager );
1599}
1600
1601static void
1602stopTorrent( Torrent * t )
1603{
1604    assert( torrentIsLocked( t ) );
1605
1606    t->isRunning = 0;
1607    tr_timerFree( &t->rechokeTimer );
1608    tr_timerFree( &t->reconnectTimer );
1609
1610    /* disconnect the peers. */
1611    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1612    tr_ptrArrayClear( t->peers );
1613
1614    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1615     * which removes the handshake from t->outgoingHandshakes... */
1616    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1617        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1618}
1619
1620void
1621tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1622                       const uint8_t * torrentHash )
1623{
1624    managerLock( manager );
1625
1626    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1627
1628    managerUnlock( manager );
1629}
1630
1631void
1632tr_peerMgrAddTorrent( tr_peerMgr * manager,
1633                      tr_torrent * tor )
1634{
1635    Torrent * t;
1636
1637    managerLock( manager );
1638
1639    assert( tor );
1640    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1641
1642    t = torrentConstructor( manager, tor );
1643    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1644
1645    managerUnlock( manager );
1646}
1647
1648void
1649tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1650                         const uint8_t * torrentHash )
1651{
1652    Torrent * t;
1653
1654    managerLock( manager );
1655
1656    t = getExistingTorrent( manager, torrentHash );
1657    assert( t );
1658    stopTorrent( t );
1659    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1660    torrentDestructor( t );
1661
1662    managerUnlock( manager );
1663}
1664
1665void
1666tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1667                               const uint8_t *    torrentHash,
1668                               int8_t *           tab,
1669                               unsigned int       tabCount )
1670{
1671    tr_piece_index_t   i;
1672    const Torrent *    t;
1673    const tr_torrent * tor;
1674    float              interval;
1675    tr_bool            isSeed;
1676    int                peerCount;
1677    const tr_peer **   peers;
1678
1679    managerLock( manager );
1680
1681    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1682    tor = t->tor;
1683    interval = tor->info.pieceCount / (float)tabCount;
1684    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1685    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1686
1687    memset( tab, 0, tabCount );
1688
1689    for( i = 0; tor && i < tabCount; ++i )
1690    {
1691        const int piece = i * interval;
1692
1693        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1694            tab[i] = -1;
1695        else if( peerCount ) {
1696            int j;
1697            for( j = 0; j < peerCount; ++j )
1698                if( tr_bitfieldHas( peers[j]->have, i ) )
1699                    ++tab[i];
1700        }
1701    }
1702
1703    managerUnlock( manager );
1704}
1705
1706/* Returns the pieces that are available from peers */
1707tr_bitfield*
1708tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1709                        const uint8_t *    torrentHash )
1710{
1711    int           i, size;
1712    Torrent *     t;
1713    tr_peer **    peers;
1714    tr_bitfield * pieces;
1715    managerLock( manager );
1716
1717    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1718    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1719    peers = getConnectedPeers( t, &size );
1720    for( i = 0; i < size; ++i )
1721        tr_bitfieldOr( pieces, peers[i]->have );
1722
1723    managerUnlock( manager );
1724    tr_free( peers );
1725    return pieces;
1726}
1727
1728int
1729tr_peerMgrHasConnections( const tr_peerMgr * manager,
1730                          const uint8_t *    torrentHash )
1731{
1732    int ret;
1733    const Torrent * t;
1734    managerLock( manager );
1735
1736    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1737    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1738
1739    managerUnlock( manager );
1740    return ret;
1741}
1742
1743void
1744tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1745                        const uint8_t    * torrentHash,
1746                        int              * setmePeersKnown,
1747                        int              * setmePeersConnected,
1748                        int              * setmeSeedsConnected,
1749                        int              * setmeWebseedsSendingToUs,
1750                        int              * setmePeersSendingToUs,
1751                        int              * setmePeersGettingFromUs,
1752                        int              * setmePeersFrom )
1753{
1754    int i, size;
1755    const Torrent * t;
1756    const tr_peer ** peers;
1757    const tr_webseed ** webseeds;
1758
1759    managerLock( manager );
1760
1761    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1762    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1763
1764    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1765    *setmePeersConnected       = 0;
1766    *setmeSeedsConnected       = 0;
1767    *setmePeersGettingFromUs   = 0;
1768    *setmePeersSendingToUs     = 0;
1769    *setmeWebseedsSendingToUs  = 0;
1770
1771    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1772        setmePeersFrom[i] = 0;
1773
1774    for( i=0; i<size; ++i )
1775    {
1776        const tr_peer * peer = peers[i];
1777        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1778
1779        if( peer->io == NULL ) /* not connected */
1780            continue;
1781
1782        ++*setmePeersConnected;
1783
1784        ++setmePeersFrom[atom->from];
1785
1786        if( clientIsDownloadingFrom( peer ) )
1787            ++*setmePeersSendingToUs;
1788
1789        if( clientIsUploadingTo( peer ) )
1790            ++*setmePeersGettingFromUs;
1791
1792        if( atom->flags & ADDED_F_SEED_FLAG )
1793            ++*setmeSeedsConnected;
1794    }
1795
1796    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1797    for( i=0; i<size; ++i )
1798        if( tr_webseedIsActive( webseeds[i] ) )
1799            ++*setmeWebseedsSendingToUs;
1800
1801    managerUnlock( manager );
1802}
1803
1804float*
1805tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1806                     const uint8_t *    torrentHash )
1807{
1808    const Torrent * t;
1809    const tr_webseed ** webseeds;
1810    int i;
1811    int webseedCount;
1812    float * ret;
1813
1814    assert( manager );
1815    managerLock( manager );
1816
1817    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1818    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1819                                                     &webseedCount );
1820    assert( webseedCount == t->tor->info.webseedCount );
1821    ret = tr_new0( float, webseedCount );
1822
1823    for( i=0; i<webseedCount; ++i )
1824        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1825            ret[i] = -1.0;
1826
1827    managerUnlock( manager );
1828    return ret;
1829}
1830
1831double
1832tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1833{
1834    assert( peer );
1835    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1836
1837    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1838}
1839
1840
1841struct tr_peer_stat *
1842tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1843                     const   uint8_t     * torrentHash,
1844                     int                 * setmeCount UNUSED )
1845{
1846    int             i, size;
1847    const Torrent * t;
1848    tr_peer **      peers;
1849    tr_peer_stat *  ret;
1850
1851    assert( manager );
1852    managerLock( manager );
1853
1854    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1855    peers = getConnectedPeers( (Torrent*)t, &size );
1856    ret = tr_new0( tr_peer_stat, size );
1857
1858    for( i = 0; i < size; ++i )
1859    {
1860        char *                   pch;
1861        const tr_peer *          peer = peers[i];
1862        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1863        tr_peer_stat *           stat = ret + i;
1864        tr_address               norm_addr;
1865
1866        norm_addr = peer->addr;
1867        tr_normalizeV4Mapped( &norm_addr );
1868        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1869        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1870                   sizeof( stat->client ) );
1871        stat->port               = ntohs( peer->port );
1872        stat->from               = atom->from;
1873        stat->progress           = peer->progress;
1874        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1875        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1876        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1877        stat->peerIsChoked       = peer->peerIsChoked;
1878        stat->peerIsInterested   = peer->peerIsInterested;
1879        stat->clientIsChoked     = peer->clientIsChoked;
1880        stat->clientIsInterested = peer->clientIsInterested;
1881        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1882        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1883        stat->isUploadingTo      = clientIsUploadingTo( peer );
1884        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1885
1886        pch = stat->flagStr;
1887        if( t->optimistic == peer ) *pch++ = 'O';
1888        if( stat->isDownloadingFrom ) *pch++ = 'D';
1889        else if( stat->clientIsInterested ) *pch++ = 'd';
1890        if( stat->isUploadingTo ) *pch++ = 'U';
1891        else if( stat->peerIsInterested ) *pch++ = 'u';
1892        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1893        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1894        if( stat->isEncrypted ) *pch++ = 'E';
1895        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1896        if( stat->isIncoming ) *pch++ = 'I';
1897        *pch = '\0';
1898    }
1899
1900    *setmeCount = size;
1901    tr_free( peers );
1902
1903    managerUnlock( manager );
1904    return ret;
1905}
1906
1907/**
1908***
1909**/
1910
1911struct ChokeData
1912{
1913    tr_bool         doUnchoke;
1914    tr_bool         isInterested;
1915    tr_bool         isChoked;
1916    int             rate;
1917    tr_peer *       peer;
1918};
1919
1920static int
1921compareChoke( const void * va,
1922              const void * vb )
1923{
1924    const struct ChokeData * a = va;
1925    const struct ChokeData * b = vb;
1926
1927    if( a->rate != b->rate ) /* prefer higher overall speeds */
1928        return a->rate > b->rate ? -1 : 1;
1929
1930    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1931        return a->isChoked ? 1 : -1;
1932
1933    return 0;
1934}
1935
1936static int
1937isNew( const tr_peer * peer )
1938{
1939    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1940}
1941
1942static int
1943isSame( const tr_peer * peer )
1944{
1945    return peer && peer->client && strstr( peer->client, "Transmission" );
1946}
1947
1948/**
1949***
1950**/
1951
1952static void
1953rechoke( Torrent * t )
1954{
1955    int                i, peerCount, size, unchokedInterested;
1956    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1957    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1958    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1959
1960    assert( torrentIsLocked( t ) );
1961
1962    /* sort the peers by preference and rate */
1963    for( i = 0, size = 0; i < peerCount; ++i )
1964    {
1965        tr_peer * peer = peers[i];
1966        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1967
1968        if( peer->progress >= 1.0 ) /* choke all seeds */
1969        {
1970            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1971        }
1972        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1973        {
1974            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1975        }
1976        else if( chokeAll ) /* choke everyone if we're not uploading */
1977        {
1978            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1979        }
1980        else
1981        {
1982            struct ChokeData * n = &choke[size++];
1983            n->peer         = peer;
1984            n->isInterested = peer->peerIsInterested;
1985            n->isChoked     = peer->peerIsChoked;
1986            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1987        }
1988    }
1989
1990    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1991
1992    /**
1993     * Reciprocation and number of uploads capping is managed by unchoking
1994     * the N peers which have the best upload rate and are interested.
1995     * This maximizes the client's download rate. These N peers are
1996     * referred to as downloaders, because they are interested in downloading
1997     * from the client.
1998     *
1999     * Peers which have a better upload rate (as compared to the downloaders)
2000     * but aren't interested get unchoked. If they become interested, the
2001     * downloader with the worst upload rate gets choked. If a client has
2002     * a complete file, it uses its upload rate rather than its download
2003     * rate to decide which peers to unchoke.
2004     */
2005    unchokedInterested = 0;
2006    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2007        choke[i].doUnchoke = 1;
2008        if( choke[i].isInterested )
2009            ++unchokedInterested;
2010    }
2011
2012    /* optimistic unchoke */
2013    if( i < size )
2014    {
2015        int n;
2016        struct ChokeData * c;
2017        tr_ptrArray * randPool = tr_ptrArrayNew( );
2018
2019        for( ; i<size; ++i )
2020        {
2021            if( choke[i].isInterested )
2022            {
2023                const tr_peer * peer = choke[i].peer;
2024                int x = 1, y;
2025                if( isNew( peer ) ) x *= 3;
2026                if( isSame( peer ) ) x *= 3;
2027                for( y=0; y<x; ++y )
2028                    tr_ptrArrayAppend( randPool, &choke[i] );
2029            }
2030        }
2031
2032        if(( n = tr_ptrArraySize( randPool )))
2033        {
2034            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
2035            c->doUnchoke = 1;
2036            t->optimistic = c->peer;
2037        }
2038
2039        tr_ptrArrayFree( randPool, NULL );
2040    }
2041
2042    for( i=0; i<size; ++i )
2043        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2044
2045    /* cleanup */
2046    tr_free( choke );
2047    tr_free( peers );
2048}
2049
2050static int
2051rechokePulse( void * vtorrent )
2052{
2053    Torrent * t = vtorrent;
2054
2055    torrentLock( t );
2056    rechoke( t );
2057    torrentUnlock( t );
2058    return TRUE;
2059}
2060
2061/***
2062****
2063****  Life and Death
2064****
2065***/
2066
2067static int
2068shouldPeerBeClosed( const Torrent * t,
2069                    const tr_peer * peer,
2070                    int             peerCount )
2071{
2072    const tr_torrent *       tor = t->tor;
2073    const time_t             now = time( NULL );
2074    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2075
2076    /* if it's marked for purging, close it */
2077    if( peer->doPurge )
2078    {
2079        tordbg( t, "purging peer %s because its doPurge flag is set",
2080                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2081        return TRUE;
2082    }
2083
2084    /* if we're seeding and the peer has everything we have,
2085     * and enough time has passed for a pex exchange, then disconnect */
2086    if( tr_torrentIsSeed( tor ) )
2087    {
2088        int peerHasEverything;
2089        if( atom->flags & ADDED_F_SEED_FLAG )
2090            peerHasEverything = TRUE;
2091        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2092            peerHasEverything = FALSE;
2093        else {
2094            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2095            tr_bitfieldDifference( tmp, peer->have );
2096            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2097            tr_bitfieldFree( tmp );
2098        }
2099
2100        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2101        {
2102            tordbg( t, "purging peer %s because we're both seeds",
2103                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2104            return TRUE;
2105        }
2106    }
2107
2108    /* disconnect if it's been too long since piece data has been transferred.
2109     * this is on a sliding scale based on number of available peers... */
2110    {
2111        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2112        /* if we have >= relaxIfFewerThan, strictness is 100%.
2113         * if we have zero connections, strictness is 0% */
2114        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2115            ? 1.0
2116            : peerCount / (float)relaxStrictnessIfFewerThanN;
2117        const int lo = MIN_UPLOAD_IDLE_SECS;
2118        const int hi = MAX_UPLOAD_IDLE_SECS;
2119        const int limit = hi - ( ( hi - lo ) * strictness );
2120        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2121/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2122        if( idleTime > limit ) {
2123            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2124                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2125            return TRUE;
2126        }
2127    }
2128
2129    return FALSE;
2130}
2131
2132static tr_peer **
2133getPeersToClose( Torrent * t, int * setmeSize )
2134{
2135    int i, peerCount, outsize;
2136    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2137    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2138
2139    assert( torrentIsLocked( t ) );
2140
2141    for( i = outsize = 0; i < peerCount; ++i )
2142        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2143            ret[outsize++] = peers[i];
2144
2145    *setmeSize = outsize;
2146    return ret;
2147}
2148
2149static int
2150compareCandidates( const void * va,
2151                   const void * vb )
2152{
2153    const struct peer_atom * a = *(const struct peer_atom**) va;
2154    const struct peer_atom * b = *(const struct peer_atom**) vb;
2155
2156    /* <Charles> Here we would probably want to try reconnecting to
2157     * peers that had most recently given us data. Lots of users have
2158     * trouble with resets due to their routers and/or ISPs. This way we
2159     * can quickly recover from an unwanted reset. So we sort
2160     * piece_data_time in descending order.
2161     */
2162
2163    if( a->piece_data_time != b->piece_data_time )
2164        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2165
2166    if( a->numFails != b->numFails )
2167        return a->numFails < b->numFails ? -1 : 1;
2168
2169    if( a->time != b->time )
2170        return a->time < b->time ? -1 : 1;
2171
2172    /* all other things being equal, prefer peers whose
2173     * information comes from a more reliable source */
2174    if( a->from != b->from )
2175        return a->from < b->from ? -1 : 1;
2176
2177    return 0;
2178}
2179
2180static int
2181getReconnectIntervalSecs( const struct peer_atom * atom )
2182{
2183    int          sec;
2184    const time_t now = time( NULL );
2185
2186    /* if we were recently connected to this peer and transferring piece
2187     * data, try to reconnect to them sooner rather that later -- we don't
2188     * want network troubles to get in the way of a good peer. */
2189    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2190        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2191
2192    /* don't allow reconnects more often than our minimum */
2193    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2194        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2195
2196    /* otherwise, the interval depends on how many times we've tried
2197     * and failed to connect to the peer */
2198    else switch( atom->numFails ) {
2199        case 0: sec = 0; break;
2200        case 1: sec = 5; break;
2201        case 2: sec = 2 * 60; break;
2202        case 3: sec = 15 * 60; break;
2203        case 4: sec = 30 * 60; break;
2204        case 5: sec = 60 * 60; break;
2205        default: sec = 120 * 60; break;
2206    }
2207
2208    return sec;
2209}
2210
2211static struct peer_atom **
2212getPeerCandidates( Torrent * t, int * setmeSize )
2213{
2214    int                 i, atomCount, retCount;
2215    struct peer_atom ** atoms;
2216    struct peer_atom ** ret;
2217    const time_t        now = time( NULL );
2218    const int           seed = tr_torrentIsSeed( t->tor );
2219
2220    assert( torrentIsLocked( t ) );
2221
2222    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2223    ret = tr_new( struct peer_atom*, atomCount );
2224    for( i = retCount = 0; i < atomCount; ++i )
2225    {
2226        int                interval;
2227        struct peer_atom * atom = atoms[i];
2228
2229        /* peer fed us too much bad data ... we only keep it around
2230         * now to weed it out in case someone sends it to us via pex */
2231        if( atom->myflags & MYFLAG_BANNED )
2232            continue;
2233
2234        /* peer was unconnectable before, so we're not going to keep trying.
2235         * this is needs a separate flag from `banned', since if they try
2236         * to connect to us later, we'll let them in */
2237        if( atom->myflags & MYFLAG_UNREACHABLE )
2238            continue;
2239
2240        /* we don't need two connections to the same peer... */
2241        if( peerIsInUse( t, &atom->addr ) )
2242            continue;
2243
2244        /* no need to connect if we're both seeds... */
2245        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2246                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2247            continue;
2248
2249        /* don't reconnect too often */
2250        interval = getReconnectIntervalSecs( atom );
2251        if( ( now - atom->time ) < interval )
2252        {
2253            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2254                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2255            continue;
2256        }
2257
2258        /* Don't connect to peers in our blocklist */
2259        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2260            continue;
2261
2262        ret[retCount++] = atom;
2263    }
2264
2265    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2266    *setmeSize = retCount;
2267    return ret;
2268}
2269
2270static int
2271reconnectPulse( void * vtorrent )
2272{
2273    Torrent *     t = vtorrent;
2274    static time_t prevTime = 0;
2275    static int    newConnectionsThisSecond = 0;
2276    time_t        now;
2277
2278    torrentLock( t );
2279
2280    now = time( NULL );
2281    if( prevTime != now )
2282    {
2283        prevTime = now;
2284        newConnectionsThisSecond = 0;
2285    }
2286
2287    if( !t->isRunning )
2288    {
2289        removeAllPeers( t );
2290    }
2291    else
2292    {
2293        int i, nCandidates, nBad;
2294        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2295        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2296
2297        if( nBad || nCandidates )
2298            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2299                    "%d connection candidates, %d atoms, max per pulse is %d",
2300                    t->tor->info.name, nBad, nCandidates,
2301                    tr_ptrArraySize( t->pool ),
2302                    (int)MAX_RECONNECTIONS_PER_PULSE );
2303
2304        /* disconnect some peers.
2305           if we transferred piece data, then they might be good peers,
2306           so reset their `numFails' weight to zero.  otherwise we connected
2307           to them fruitlessly, so mark it as another fail */
2308        for( i = 0; i < nBad; ++i ) {
2309            tr_peer * peer = connections[i];
2310            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2311            if( atom->piece_data_time )
2312                atom->numFails = 0;
2313            else
2314                ++atom->numFails;
2315            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2316            removePeer( t, peer );
2317        }
2318
2319        /* add some new ones */
2320        for( i = 0;    ( i < nCandidates )
2321           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2322           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2323           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2324        {
2325            tr_peerMgr *       mgr = t->manager;
2326            struct peer_atom * atom = candidates[i];
2327            tr_peerIo *        io;
2328
2329            tordbg( t, "Starting an OUTGOING connection with %s",
2330                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2331
2332            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2333            if( io == NULL )
2334            {
2335                atom->myflags |= MYFLAG_UNREACHABLE;
2336            }
2337            else
2338            {
2339                tr_handshake * handshake = tr_handshakeNew( io,
2340                                                            mgr->session->encryptionMode,
2341                                                            myHandshakeDoneCB,
2342                                                            mgr );
2343
2344                assert( tr_peerIoGetTorrentHash( io ) );
2345
2346                ++newConnectionsThisSecond;
2347
2348                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2349                                         handshakeCompare );
2350            }
2351
2352            atom->time = time( NULL );
2353        }
2354
2355        /* cleanup */
2356        tr_free( connections );
2357        tr_free( candidates );
2358    }
2359
2360    torrentUnlock( t );
2361    return TRUE;
2362}
2363
2364/****
2365*****
2366*****  BANDWIDTH ALLOCATION
2367*****
2368****/
2369
2370static void
2371pumpAllPeers( tr_peerMgr * mgr )
2372{
2373    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2374    int       i, j;
2375
2376    for( i=0; i<torrentCount; ++i )
2377    {
2378        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2379        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2380        {
2381            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2382            tr_peerMsgsPulse( peer->msgs );
2383        }
2384    }
2385}
2386
2387static int
2388bandwidthPulse( void * vmgr )
2389{
2390    tr_handshake * handshake;
2391    tr_peerMgr * mgr = vmgr;
2392    managerLock( mgr );
2393
2394    /* FIXME: this next line probably isn't necessary... */
2395    pumpAllPeers( mgr );
2396
2397    /* allocate bandwidth to the peers */
2398    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2399    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2400
2401    /* free all the finished handshakes */
2402    while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
2403        tr_handshakeFree( handshake );
2404
2405    managerUnlock( mgr );
2406    return TRUE;
2407}
Note: See TracBrowser for help on using the repository browser.