source: trunk/libtransmission/peer-mgr.c @ 7646

Last change on this file since 7646 was 7646, checked in by charles, 12 years ago

(trunk libT) finally get around to making upload-slots-per-torrent a settings.json option, thanks to friendly prodding from ful in #transmission

  • Property svn:keywords set to Date Rev Author Id
File size: 68.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7646 2009-01-09 15:45:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 16,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 32,
69
70    /* number of bad pieces a peer is allowed to send before we ban them */
71    MAX_BAD_PIECES_PER_PEER = 5,
72
73    /* use for bitwise operations w/peer_atom.myflags */
74    MYFLAG_BANNED = 1,
75
76    /* unreachable for now... but not banned.
77     * if they try to connect to us it's okay */
78    MYFLAG_UNREACHABLE = 2,
79
80    /* the minimum we'll wait before attempting to reconnect to a peer */
81    MINIMUM_RECONNECT_INTERVAL_SECS = 5
82};
83
84
85/**
86***
87**/
88
89enum
90{
91    UPLOAD_ONLY_UKNOWN,
92    UPLOAD_ONLY_YES,
93    UPLOAD_ONLY_NO
94};
95
96/**
97 * Peer information that should be kept even before we've connected and
98 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
99 * which ones would make good candidates for connecting to, and to watch out
100 * for banned peers.
101 *
102 * @see tr_peer
103 * @see tr_peermsgs
104 */
105struct peer_atom
106{
107    uint8_t     from;
108    uint8_t     flags;       /* these match the added_f flags */
109    uint8_t     myflags;     /* flags that aren't defined in added_f */
110    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
111    tr_port     port;
112    uint16_t    numFails;
113    tr_address  addr;
114    time_t      time;        /* when the peer's connection status last changed */
115    time_t      piece_data_time;
116};
117
118typedef struct
119{
120    tr_bool         isRunning;
121
122    uint8_t         hash[SHA_DIGEST_LENGTH];
123    int         *   pendingRequestCount;
124    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
125    tr_ptrArray     pool; /* struct peer_atom */
126    tr_ptrArray     peers; /* tr_peer */
127    tr_ptrArray     webseeds; /* tr_webseed */
128    tr_timer *      reconnectTimer;
129    tr_timer *      rechokeTimer;
130    tr_timer *      refillTimer;
131    tr_torrent *    tor;
132    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
133
134    struct tr_peerMgr * manager;
135}
136Torrent;
137
138struct tr_peerMgr
139{
140    tr_session      * session;
141    tr_ptrArray       torrents; /* Torrent */
142    tr_ptrArray       incomingHandshakes; /* tr_handshake */
143    tr_timer        * bandwidthTimer;
144};
145
146#define tordbg( t, ... ) \
147    do { \
148        if( tr_deepLoggingIsActive( ) ) \
149            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
150    } while( 0 )
151
152#define dbgmsg( ... ) \
153    do { \
154        if( tr_deepLoggingIsActive( ) ) \
155            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
156    } while( 0 )
157
158/**
159***
160**/
161
162static inline void
163managerLock( const struct tr_peerMgr * manager )
164{
165    tr_globalLock( manager->session );
166}
167
168static inline void
169managerUnlock( const struct tr_peerMgr * manager )
170{
171    tr_globalUnlock( manager->session );
172}
173
174static inline void
175torrentLock( Torrent * torrent )
176{
177    managerLock( torrent->manager );
178}
179
180static inline void
181torrentUnlock( Torrent * torrent )
182{
183    managerUnlock( torrent->manager );
184}
185
186static inline int
187torrentIsLocked( const Torrent * t )
188{
189    return tr_globalIsLocked( t->manager->session );
190}
191
192/**
193***
194**/
195
196static int
197handshakeCompareToAddr( const void * va, const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a, const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray      * handshakes,
212                      const tr_address * addr )
213{
214    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
215}
216
217static int
218comparePeerAtomToAddress( const void * va, const void * vb )
219{
220    const struct peer_atom * a = va;
221
222    return tr_compareAddresses( &a->addr, vb );
223}
224
225static int
226comparePeerAtoms( const void * va, const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static int
238torrentCompare( const void * va,
239                const void * vb )
240{
241    const Torrent * a = va;
242    const Torrent * b = vb;
243
244    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
245}
246
247static int
248torrentCompareToHash( const void * va,
249                      const void * vb )
250{
251    const Torrent * a = va;
252    const uint8_t * b_hash = vb;
253
254    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
255}
256
257static Torrent*
258getExistingTorrent( tr_peerMgr *    manager,
259                    const uint8_t * hash )
260{
261    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
262                                             hash,
263                                             torrentCompareToHash );
264}
265
266static int
267peerCompare( const void * va, const void * vb )
268{
269    const tr_peer * a = va;
270    const tr_peer * b = vb;
271
272    return tr_compareAddresses( &a->addr, &b->addr );
273}
274
275static int
276peerCompareToAddr( const void * va, const void * vb )
277{
278    const tr_peer * a = va;
279
280    return tr_compareAddresses( &a->addr, vb );
281}
282
283static tr_peer*
284getExistingPeer( Torrent          * torrent,
285                 const tr_address * addr )
286{
287    assert( torrentIsLocked( torrent ) );
288    assert( addr );
289
290    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
291}
292
293static struct peer_atom*
294getExistingAtom( const Torrent    * t,
295                 const tr_address * addr )
296{
297    Torrent * tt = (Torrent*)t;
298    assert( torrentIsLocked( t ) );
299    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
300}
301
302static tr_bool
303peerIsInUse( const Torrent    * ct,
304             const tr_address * addr )
305{
306    Torrent * t = (Torrent*) ct;
307
308    assert( torrentIsLocked ( t ) );
309
310    return getExistingPeer( t, addr )
311        || getExistingHandshake( &t->outgoingHandshakes, addr )
312        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
313}
314
315static tr_peer*
316peerConstructor( const tr_address * addr )
317{
318    tr_peer * p;
319    p = tr_new0( tr_peer, 1 );
320    p->addr = *addr;
321    return p;
322}
323
324static tr_peer*
325getPeer( Torrent          * torrent,
326         const tr_address * addr )
327{
328    tr_peer * peer;
329
330    assert( torrentIsLocked( torrent ) );
331
332    peer = getExistingPeer( torrent, addr );
333
334    if( peer == NULL )
335    {
336        peer = peerConstructor( addr );
337        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
338    }
339
340    return peer;
341}
342
343static void
344peerDestructor( tr_peer * peer )
345{
346    assert( peer );
347
348    if( peer->msgs != NULL )
349    {
350        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
351        tr_peerMsgsFree( peer->msgs );
352    }
353
354    tr_peerIoClear( peer->io );
355    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
356
357    tr_bitfieldFree( peer->have );
358    tr_bitfieldFree( peer->blame );
359    tr_free( peer->client );
360
361    tr_free( peer );
362}
363
364static void
365removePeer( Torrent * t,
366            tr_peer * peer )
367{
368    tr_peer *          removed;
369    struct peer_atom * atom;
370
371    assert( torrentIsLocked( t ) );
372
373    atom = getExistingAtom( t, &peer->addr );
374    assert( atom );
375    atom->time = time( NULL );
376
377    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
378    assert( removed == peer );
379    peerDestructor( removed );
380}
381
382static void
383removeAllPeers( Torrent * t )
384{
385    while( !tr_ptrArrayEmpty( &t->peers ) )
386        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
387}
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( torrentIsLocked( t ) );
398    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
399    assert( tr_ptrArrayEmpty( &t->peers ) );
400
401    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
402
403    tr_timerFree( &t->reconnectTimer );
404    tr_timerFree( &t->rechokeTimer );
405    tr_timerFree( &t->refillTimer );
406
407    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
408    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
409    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
410    tr_ptrArrayDestruct( &t->peers, NULL );
411
412    tr_free( t->pendingRequestCount );
413    tr_free( t );
414}
415
416static void peerCallbackFunc( void * vpeer,
417                              void * vevent,
418                              void * vt );
419
420static Torrent*
421torrentConstructor( tr_peerMgr * manager,
422                    tr_torrent * tor )
423{
424    int       i;
425    Torrent * t;
426
427    t = tr_new0( Torrent, 1 );
428    t->manager = manager;
429    t->tor = tor;
430    t->pool = TR_PTR_ARRAY_INIT;
431    t->peers = TR_PTR_ARRAY_INIT;
432    t->webseeds = TR_PTR_ARRAY_INIT;
433    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
434    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
435
436    for( i = 0; i < tor->info.webseedCount; ++i )
437    {
438        tr_webseed * w =
439            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
440        tr_ptrArrayAppend( &t->webseeds, w );
441    }
442
443    return t;
444}
445
446
447static int bandwidthPulse( void * vmgr );
448
449
450tr_peerMgr*
451tr_peerMgrNew( tr_session * session )
452{
453    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
454
455    m->session = session;
456    m->torrents = TR_PTR_ARRAY_INIT;
457    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
458    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
459    return m;
460}
461
462void
463tr_peerMgrFree( tr_peerMgr * manager )
464{
465    managerLock( manager );
466
467    tr_timerFree( &manager->bandwidthTimer );
468
469    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
470     * the item from manager->handshakes, so this is a little roundabout... */
471    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
472        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
473
474    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
475
476    /* free the torrents. */
477    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
478
479    managerUnlock( manager );
480    tr_free( manager );
481}
482
483static int
484clientIsDownloadingFrom( const tr_peer * peer )
485{
486    return peer->clientIsInterested && !peer->clientIsChoked;
487}
488
489static int
490clientIsUploadingTo( const tr_peer * peer )
491{
492    return peer->peerIsInterested && !peer->peerIsChoked;
493}
494
495/***
496****
497***/
498
499tr_bool
500tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
501                      const uint8_t     * torrentHash,
502                      const tr_address  * addr )
503{
504    tr_bool isSeed = FALSE;
505    const Torrent * t = NULL;
506    const struct peer_atom * atom = NULL;
507
508    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
509    if( t )
510        atom = getExistingAtom( t, addr );
511    if( atom )
512        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
513
514    return isSeed;
515}
516
517/****
518*****
519*****  REFILL
520*****
521****/
522
523static void
524assertValidPiece( Torrent * t, tr_piece_index_t piece )
525{
526    assert( t );
527    assert( t->tor );
528    assert( piece < t->tor->info.pieceCount );
529}
530
531static int
532getPieceRequests( Torrent * t, tr_piece_index_t piece )
533{
534    assertValidPiece( t, piece );
535
536    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
537}
538
539static void
540incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
541{
542    assertValidPiece( t, piece );
543
544    if( t->pendingRequestCount == NULL )
545        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
546    t->pendingRequestCount[piece]++;
547}
548
549static void
550decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
551{
552    assertValidPiece( t, piece );
553
554    if( t->pendingRequestCount )
555        t->pendingRequestCount[piece]--;
556}
557
558struct tr_refill_piece
559{
560    tr_priority_t    priority;
561    uint32_t         piece;
562    uint32_t         peerCount;
563    int              random;
564    int              pendingRequestCount;
565    int              missingBlockCount;
566};
567
568static int
569compareRefillPiece( const void * aIn, const void * bIn )
570{
571    const struct tr_refill_piece * a = aIn;
572    const struct tr_refill_piece * b = bIn;
573
574    /* if one piece has a higher priority, it goes first */
575    if( a->priority != b->priority )
576        return a->priority > b->priority ? -1 : 1;
577
578    /* have a per-priority endgame */
579    if( a->pendingRequestCount != b->pendingRequestCount )
580        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
581
582    /* fewer missing pieces goes first */
583    if( a->missingBlockCount != b->missingBlockCount )
584        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
585
586    /* otherwise if one has fewer peers, it goes first */
587    if( a->peerCount != b->peerCount )
588        return a->peerCount < b->peerCount ? -1 : 1;
589
590    /* otherwise go with our random seed */
591    if( a->random != b->random )
592        return a->random < b->random ? -1 : 1;
593
594    return 0;
595}
596
597static tr_piece_index_t *
598getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
599{
600    const tr_torrent  * tor = t->tor;
601    const tr_info     * inf = &tor->info;
602    tr_piece_index_t    i;
603    tr_piece_index_t    poolSize = 0;
604    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
605    int                 peerCount;
606    const tr_peer    ** peers;
607
608    assert( torrentIsLocked( t ) );
609
610    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
611    peerCount = tr_ptrArraySize( &t->peers );
612
613    /* make a list of the pieces that we want but don't have */
614    for( i = 0; i < inf->pieceCount; ++i )
615        if( !tor->info.pieces[i].dnd
616                && !tr_cpPieceIsComplete( &tor->completion, i ) )
617            pool[poolSize++] = i;
618
619    /* sort the pool by which to request next */
620    if( poolSize > 1 )
621    {
622        tr_piece_index_t j;
623        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
624
625        for( j = 0; j < poolSize; ++j )
626        {
627            int k;
628            const tr_piece_index_t piece = pool[j];
629            struct tr_refill_piece * setme = p + j;
630
631            setme->piece = piece;
632            setme->priority = inf->pieces[piece].priority;
633            setme->peerCount = 0;
634            setme->random = tr_cryptoWeakRandInt( INT_MAX );
635            setme->pendingRequestCount = getPieceRequests( t, piece );
636            setme->missingBlockCount
637                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
638
639            for( k = 0; k < peerCount; ++k )
640            {
641                const tr_peer * peer = peers[k];
642                if( peer->peerIsInterested
643                        && !peer->clientIsChoked
644                        && tr_bitfieldHas( peer->have, piece ) )
645                    ++setme->peerCount;
646            }
647        }
648
649        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
650               compareRefillPiece );
651
652        for( j = 0; j < poolSize; ++j )
653            pool[j] = p[j].piece;
654
655        tr_free( p );
656    }
657
658    *pieceCount = poolSize;
659    return pool;
660}
661
662struct tr_blockIterator
663{
664    Torrent * t;
665    tr_block_index_t blockIndex, blockCount, *blocks;
666    tr_piece_index_t pieceIndex, pieceCount, *pieces;
667};
668
669static struct tr_blockIterator*
670blockIteratorNew( Torrent * t )
671{
672    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
673    i->t = t;
674    i->pieces = getPreferredPieces( t, &i->pieceCount );
675    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
676    return i;
677}
678
679static int
680blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
681{
682    int found;
683    Torrent * t = i->t;
684    tr_torrent * tor = t->tor;
685
686    while( ( i->blockIndex == i->blockCount )
687        && ( i->pieceIndex < i->pieceCount ) )
688    {
689        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
690        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
691        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
692        tr_block_index_t block;
693
694        assert( index < tor->info.pieceCount );
695
696        i->blockCount = 0;
697        i->blockIndex = 0;
698        for( block=b; block!=e; ++block )
699            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
700                i->blocks[i->blockCount++] = block;
701    }
702
703    assert( i->blockCount <= tor->blockCountInPiece );
704
705    if(( found = ( i->blockIndex < i->blockCount )))
706        *setme = i->blocks[i->blockIndex++];
707
708    return found;
709}
710
711static void
712blockIteratorFree( struct tr_blockIterator * i )
713{
714    tr_free( i->blocks );
715    tr_free( i->pieces );
716    tr_free( i );
717}
718
719static tr_peer**
720getPeersUploadingToClient( Torrent * t,
721                           int *     setmeCount )
722{
723    int j;
724    int peerCount = 0;
725    int retCount = 0;
726    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
727    tr_peer ** ret = tr_new( tr_peer *, peerCount );
728
729    j = 0; /* this is a temporary test to make sure we walk through all the peers */
730    if( peerCount )
731    {
732        /* Get a list of peers we're downloading from.
733           Pick a different starting point each time so all peers
734           get a chance at being the first in line */
735        const int fencepost = tr_cryptoWeakRandInt( peerCount );
736        int i = fencepost;
737        do {
738            if( clientIsDownloadingFrom( peers[i] ) )
739                ret[retCount++] = peers[i];
740            i = ( i + 1 ) % peerCount;
741            ++j;
742        } while( i != fencepost );
743    }
744    assert( j == peerCount );
745    *setmeCount = retCount;
746    return ret;
747}
748
749static uint32_t
750getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
751{
752    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
753    const uint64_t blockPos = tor->blockSize * b;
754    assert( blockPos >= piecePos );
755    return (uint32_t)( blockPos - piecePos );
756}
757
758static int
759refillPulse( void * vtorrent )
760{
761    tr_block_index_t block;
762    int peerCount;
763    int webseedCount;
764    tr_peer ** peers;
765    tr_webseed ** webseeds;
766    struct tr_blockIterator * blockIterator;
767    Torrent * t = vtorrent;
768    tr_torrent * tor = t->tor;
769
770    if( !t->isRunning )
771        return TRUE;
772    if( tr_torrentIsSeed( t->tor ) )
773        return TRUE;
774
775    torrentLock( t );
776    tordbg( t, "Refilling Request Buffers..." );
777
778    blockIterator = blockIteratorNew( t );
779    peers = getPeersUploadingToClient( t, &peerCount );
780    webseedCount = tr_ptrArraySize( &t->webseeds );
781    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
782                          webseedCount * sizeof( tr_webseed* ) );
783
784    while( ( webseedCount || peerCount )
785        && blockIteratorNext( blockIterator, &block ) )
786    {
787        int j;
788        int handled = FALSE;
789
790        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
791        const uint32_t offset = getBlockOffsetInPiece( tor, block );
792        const uint32_t length = tr_torBlockCountBytes( tor, block );
793
794        /* find a peer who can ask for this block */
795        for( j=0; !handled && j<peerCount; )
796        {
797            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
798            switch( val )
799            {
800                case TR_ADDREQ_FULL:
801                case TR_ADDREQ_CLIENT_CHOKED:
802                    peers[j] = peers[--peerCount];
803                    break;
804
805                case TR_ADDREQ_MISSING:
806                case TR_ADDREQ_DUPLICATE:
807                    ++j;
808                    break;
809
810                case TR_ADDREQ_OK:
811                    incrementPieceRequests( t, index );
812                    handled = TRUE;
813                    break;
814
815                default:
816                    assert( 0 && "unhandled value" );
817                    break;
818            }
819        }
820
821        /* maybe one of the webseeds can do it */
822        for( j=0; !handled && j<webseedCount; )
823        {
824            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
825            switch( val )
826            {
827                case TR_ADDREQ_FULL:
828                    webseeds[j] = webseeds[--webseedCount];
829                    break;
830
831                case TR_ADDREQ_OK:
832                    incrementPieceRequests( t, index );
833                    handled = TRUE;
834                    break;
835
836                default:
837                    assert( 0 && "unhandled value" );
838                    break;
839            }
840        }
841    }
842
843    /* cleanup */
844    blockIteratorFree( blockIterator );
845    tr_free( webseeds );
846    tr_free( peers );
847
848    t->refillTimer = NULL;
849    torrentUnlock( t );
850    return FALSE;
851}
852
853static void
854broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
855{
856    size_t i;
857    size_t peerCount;
858    tr_peer ** peers;
859
860    assert( torrentIsLocked( t ) );
861
862    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
863
864    peerCount = tr_ptrArraySize( &t->peers );
865    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
866    for( i=0; i<peerCount; ++i )
867        if( peers[i]->msgs )
868            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
869}
870
871static void
872addStrike( Torrent * t,
873           tr_peer * peer )
874{
875    tordbg( t, "increasing peer %s strike count to %d",
876            tr_peerIoAddrStr( &peer->addr,
877                              peer->port ), peer->strikes + 1 );
878
879    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
880    {
881        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
882        atom->myflags |= MYFLAG_BANNED;
883        peer->doPurge = 1;
884        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
885    }
886}
887
888static void
889gotBadPiece( Torrent *        t,
890             tr_piece_index_t pieceIndex )
891{
892    tr_torrent *   tor = t->tor;
893    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
894
895    tor->corruptCur += byteCount;
896    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
897}
898
899static void
900refillSoon( Torrent * t )
901{
902    if( t->refillTimer == NULL )
903        t->refillTimer = tr_timerNew( t->manager->session,
904                                      refillPulse, t,
905                                      REFILL_PERIOD_MSEC );
906}
907
908static void
909peerSuggestedPiece( Torrent            * t UNUSED,
910                    tr_peer            * peer UNUSED,
911                    tr_piece_index_t     pieceIndex UNUSED,
912                    int                  isFastAllowed UNUSED )
913{
914#if 0
915    assert( t );
916    assert( peer );
917    assert( peer->msgs );
918
919    /* is this a valid piece? */
920    if(  pieceIndex >= t->tor->info.pieceCount )
921        return;
922
923    /* don't ask for it if we've already got it */
924    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
925        return;
926
927    /* don't ask for it if they don't have it */
928    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
929        return;
930
931    /* don't ask for it if we're choked and it's not fast */
932    if( !isFastAllowed && peer->clientIsChoked )
933        return;
934
935    /* request the blocks that we don't have in this piece */
936    {
937        tr_block_index_t block;
938        const tr_torrent * tor = t->tor;
939        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
940        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
941
942        for( block=start; block<end; ++block )
943        {
944            if( !tr_cpBlockIsComplete( tor->completion, block ) )
945            {
946                const uint32_t offset = getBlockOffsetInPiece( tor, block );
947                const uint32_t length = tr_torBlockCountBytes( tor, block );
948                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
949                incrementPieceRequests( t, pieceIndex );
950            }
951        }
952    }
953#endif
954}
955
956static void
957peerCallbackFunc( void * vpeer, void * vevent, void * vt )
958{
959    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
960    Torrent * t = vt;
961    const tr_peer_event * e = vevent;
962
963    torrentLock( t );
964
965    switch( e->eventType )
966    {
967        case TR_PEER_UPLOAD_ONLY:
968            /* update our atom */
969            if( peer ) {
970                struct peer_atom * a = getExistingAtom( t, &peer->addr );
971                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
972            }
973            break;
974
975        case TR_PEER_NEED_REQ:
976            refillSoon( t );
977            break;
978
979        case TR_PEER_CANCEL:
980            decrementPieceRequests( t, e->pieceIndex );
981            break;
982
983        case TR_PEER_PEER_GOT_DATA:
984        {
985            const time_t now = time( NULL );
986            tr_torrent * tor = t->tor;
987
988            tor->activityDate = now;
989
990            if( e->wasPieceData )
991                tor->uploadedCur += e->length;
992
993            /* update the stats */
994            if( e->wasPieceData )
995                tr_statsAddUploaded( tor->session, e->length );
996
997            /* update our atom */
998            if( peer ) {
999                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1000                if( e->wasPieceData )
1001                    a->piece_data_time = now;
1002            }
1003
1004            break;
1005        }
1006
1007        case TR_PEER_CLIENT_GOT_SUGGEST:
1008            if( peer )
1009                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1010            break;
1011
1012        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1013            if( peer )
1014                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1015            break;
1016
1017        case TR_PEER_CLIENT_GOT_DATA:
1018        {
1019            const time_t now = time( NULL );
1020            tr_torrent * tor = t->tor;
1021            tor->activityDate = now;
1022
1023            /* only add this to downloadedCur if we got it from a peer --
1024             * webseeds shouldn't count against our ratio.  As one tracker
1025             * admin put it, "Those pieces are downloaded directly from the
1026             * content distributor, not the peers, it is the tracker's job
1027             * to manage the swarms, not the web server and does not fit
1028             * into the jurisdiction of the tracker." */
1029            if( peer && e->wasPieceData )
1030                tor->downloadedCur += e->length;
1031
1032            /* update the stats */ 
1033            if( e->wasPieceData )
1034                tr_statsAddDownloaded( tor->session, e->length );
1035
1036            /* update our atom */
1037            if( peer ) {
1038                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1039                if( e->wasPieceData )
1040                    a->piece_data_time = now;
1041            }
1042
1043            break;
1044        }
1045
1046        case TR_PEER_PEER_PROGRESS:
1047        {
1048            if( peer )
1049            {
1050                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1051                const int peerIsSeed = e->progress >= 1.0;
1052                if( peerIsSeed ) {
1053                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1054                    atom->flags |= ADDED_F_SEED_FLAG;
1055                } else {
1056                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1057                    atom->flags &= ~ADDED_F_SEED_FLAG;
1058                }
1059            }
1060            break;
1061        }
1062
1063        case TR_PEER_CLIENT_GOT_BLOCK:
1064        {
1065            tr_torrent *     tor = t->tor;
1066
1067            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1068
1069            tr_cpBlockAdd( &tor->completion, block );
1070            decrementPieceRequests( t, e->pieceIndex );
1071
1072            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1073
1074            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1075            {
1076                const tr_piece_index_t p = e->pieceIndex;
1077                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1078
1079                if( !ok )
1080                {
1081                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1082                               (unsigned long)p );
1083                }
1084
1085                tr_torrentSetHasPiece( tor, p, ok );
1086                tr_torrentSetPieceChecked( tor, p, TRUE );
1087                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1088
1089                if( !ok )
1090                    gotBadPiece( t, p );
1091                else
1092                {
1093                    int i;
1094                    int peerCount = tr_ptrArraySize( &t->peers );
1095                    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1096                    for( i=0; i<peerCount; ++i )
1097                        tr_peerMsgsHave( peers[i]->msgs, p );
1098                }
1099
1100                tr_torrentRecheckCompleteness( tor );
1101            }
1102            break;
1103        }
1104
1105        case TR_PEER_ERROR:
1106            if( e->err == EINVAL )
1107            {
1108                addStrike( t, peer );
1109                peer->doPurge = 1;
1110                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1111            }
1112            else if( ( e->err == ERANGE )
1113                  || ( e->err == EMSGSIZE )
1114                  || ( e->err == ENOTCONN ) )
1115            {
1116                /* some protocol error from the peer */
1117                peer->doPurge = 1;
1118                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1119            }
1120            else /* a local error, such as an IO error */
1121            {
1122                t->tor->error = e->err;
1123                tr_strlcpy( t->tor->errorString,
1124                            tr_strerror( t->tor->error ),
1125                            sizeof( t->tor->errorString ) );
1126                tr_torrentStop( t->tor );
1127            }
1128            break;
1129
1130        default:
1131            assert( 0 );
1132    }
1133
1134    torrentUnlock( t );
1135}
1136
1137static void
1138ensureAtomExists( Torrent          * t,
1139                  const tr_address * addr,
1140                  tr_port            port,
1141                  uint8_t            flags,
1142                  uint8_t            from )
1143{
1144    if( getExistingAtom( t, addr ) == NULL )
1145    {
1146        struct peer_atom * a;
1147        a = tr_new0( struct peer_atom, 1 );
1148        a->addr = *addr;
1149        a->port = port;
1150        a->flags = flags;
1151        a->from = from;
1152        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1153        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1154    }
1155}
1156
1157static int
1158getMaxPeerCount( const tr_torrent * tor )
1159{
1160    return tor->maxConnectedPeers;
1161}
1162
1163static int
1164getPeerCount( const Torrent * t )
1165{
1166    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1167}
1168
1169/* FIXME: this is kind of a mess. */
1170static tr_bool
1171myHandshakeDoneCB( tr_handshake  * handshake,
1172                   tr_peerIo     * io,
1173                   int             isConnected,
1174                   const uint8_t * peer_id,
1175                   void          * vmanager )
1176{
1177    tr_bool            ok = isConnected;
1178    tr_bool            success = FALSE;
1179    tr_port            port;
1180    const tr_address * addr;
1181    tr_peerMgr       * manager = vmanager;
1182    Torrent          * t;
1183    tr_handshake     * ours;
1184
1185    assert( io );
1186    assert( isConnected == 0 || isConnected == 1 );
1187
1188    t = tr_peerIoHasTorrentHash( io )
1189        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1190        : NULL;
1191
1192    if( tr_peerIoIsIncoming ( io ) )
1193        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1194                                        handshake, handshakeCompare );
1195    else if( t )
1196        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1197                                        handshake, handshakeCompare );
1198    else
1199        ours = handshake;
1200
1201    assert( ours );
1202    assert( ours == handshake );
1203
1204    if( t )
1205        torrentLock( t );
1206
1207    addr = tr_peerIoGetAddress( io, &port );
1208
1209    if( !ok || !t || !t->isRunning )
1210    {
1211        if( t )
1212        {
1213            struct peer_atom * atom = getExistingAtom( t, addr );
1214            if( atom )
1215                ++atom->numFails;
1216        }
1217    }
1218    else /* looking good */
1219    {
1220        struct peer_atom * atom;
1221        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1222        atom = getExistingAtom( t, addr );
1223        atom->time = time( NULL );
1224        atom->piece_data_time = 0;
1225
1226        if( atom->myflags & MYFLAG_BANNED )
1227        {
1228            tordbg( t, "banned peer %s tried to reconnect",
1229                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1230        }
1231        else if( tr_peerIoIsIncoming( io )
1232               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1233
1234        {
1235        }
1236        else
1237        {
1238            tr_peer * peer = getExistingPeer( t, addr );
1239
1240            if( peer ) /* we already have this peer */
1241            {
1242            }
1243            else
1244            {
1245                peer = getPeer( t, addr );
1246                tr_free( peer->client );
1247
1248                if( !peer_id )
1249                    peer->client = NULL;
1250                else {
1251                    char client[128];
1252                    tr_clientForId( client, sizeof( client ), peer_id );
1253                    peer->client = tr_strdup( client );
1254                }
1255
1256                peer->port = port;
1257                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1258                                                                balanced by our unref in peerDestructor()  */
1259                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1260                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1261
1262                success = TRUE;
1263            }
1264        }
1265    }
1266
1267    if( t )
1268        torrentUnlock( t );
1269
1270    return success;
1271}
1272
1273void
1274tr_peerMgrAddIncoming( tr_peerMgr * manager,
1275                       tr_address * addr,
1276                       tr_port      port,
1277                       int          socket )
1278{
1279    managerLock( manager );
1280
1281    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1282    {
1283        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1284        tr_netClose( socket );
1285    }
1286    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1287    {
1288        tr_netClose( socket );
1289    }
1290    else /* we don't have a connetion to them yet... */
1291    {
1292        tr_peerIo *    io;
1293        tr_handshake * handshake;
1294
1295        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1296
1297        handshake = tr_handshakeNew( io,
1298                                     manager->session->encryptionMode,
1299                                     myHandshakeDoneCB,
1300                                     manager );
1301
1302        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1303
1304        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1305                                 handshakeCompare );
1306    }
1307
1308    managerUnlock( manager );
1309}
1310
1311static tr_bool
1312tr_isPex( const tr_pex * pex )
1313{
1314    return pex && tr_isAddress( &pex->addr );
1315}
1316
1317void
1318tr_peerMgrAddPex( tr_peerMgr *    manager,
1319                  const uint8_t * torrentHash,
1320                  uint8_t         from,
1321                  const tr_pex *  pex )
1322{
1323    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1324    {
1325        Torrent * t;
1326        managerLock( manager );
1327
1328        t = getExistingTorrent( manager, torrentHash );
1329        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1330            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1331
1332        managerUnlock( manager );
1333    }
1334}
1335
1336tr_pex *
1337tr_peerMgrCompactToPex( const void *    compact,
1338                        size_t          compactLen,
1339                        const uint8_t * added_f,
1340                        size_t          added_f_len,
1341                        size_t *        pexCount )
1342{
1343    size_t          i;
1344    size_t          n = compactLen / 6;
1345    const uint8_t * walk = compact;
1346    tr_pex *        pex = tr_new0( tr_pex, n );
1347
1348    for( i = 0; i < n; ++i )
1349    {
1350        pex[i].addr.type = TR_AF_INET;
1351        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1352        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1353        if( added_f && ( n == added_f_len ) )
1354            pex[i].flags = added_f[i];
1355    }
1356
1357    *pexCount = n;
1358    return pex;
1359}
1360
1361tr_pex *
1362tr_peerMgrCompact6ToPex( const void    * compact,
1363                         size_t          compactLen,
1364                         const uint8_t * added_f,
1365                         size_t          added_f_len,
1366                         size_t        * pexCount )
1367{
1368    size_t          i;
1369    size_t          n = compactLen / 18;
1370    const uint8_t * walk = compact;
1371    tr_pex *        pex = tr_new0( tr_pex, n );
1372   
1373    for( i = 0; i < n; ++i )
1374    {
1375        pex[i].addr.type = TR_AF_INET6;
1376        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1377        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1378        if( added_f && ( n == added_f_len ) )
1379            pex[i].flags = added_f[i];
1380    }
1381   
1382    *pexCount = n;
1383    return pex;
1384}
1385
1386tr_pex *
1387tr_peerMgrArrayToPex( const void * array,
1388                      size_t       arrayLen,
1389                      size_t      * pexCount )
1390{
1391    size_t          i;
1392    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1393    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1394    const uint8_t * walk = array;
1395    tr_pex        * pex = tr_new0( tr_pex, n );
1396   
1397    for( i = 0 ; i < n ; i++ ) {
1398        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1399        tr_suspectAddress( &pex[i].addr, "tracker"  );
1400        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1401        pex[i].flags = 0x00;
1402        walk += sizeof( tr_address ) + 2;
1403    }
1404   
1405    *pexCount = n;
1406    return pex;
1407}
1408
1409/**
1410***
1411**/
1412
1413void
1414tr_peerMgrSetBlame( tr_peerMgr *     manager,
1415                    const uint8_t *  torrentHash,
1416                    tr_piece_index_t pieceIndex,
1417                    int              success )
1418{
1419    if( !success )
1420    {
1421        int        peerCount, i;
1422        Torrent *  t = getExistingTorrent( manager, torrentHash );
1423        tr_peer ** peers;
1424
1425        assert( torrentIsLocked( t ) );
1426
1427        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1428        for( i = 0; i < peerCount; ++i )
1429        {
1430            tr_peer * peer = peers[i];
1431            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1432            {
1433                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1434                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1435                        pieceIndex, (int)peer->strikes + 1 );
1436                addStrike( t, peer );
1437            }
1438        }
1439    }
1440}
1441
1442int
1443tr_pexCompare( const void * va, const void * vb )
1444{
1445    const tr_pex * a = va;
1446    const tr_pex * b = vb;
1447    int i;
1448
1449    assert( tr_isPex( a ) );
1450    assert( tr_isPex( b ) );
1451
1452    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1453        return i;
1454
1455    if( a->port != b->port )
1456        return a->port < b->port ? -1 : 1;
1457
1458    return 0;
1459}
1460
1461static int
1462peerPrefersCrypto( const tr_peer * peer )
1463{
1464    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1465        return TRUE;
1466
1467    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1468        return FALSE;
1469
1470    return tr_peerIoIsEncrypted( peer->io );
1471}
1472
1473int
1474tr_peerMgrGetPeers( tr_peerMgr      * manager,
1475                    const uint8_t   * torrentHash,
1476                    tr_pex         ** setme_pex,
1477                    uint8_t           af)
1478{
1479    int peersReturning = 0;
1480    const Torrent *  t;
1481
1482    managerLock( manager );
1483
1484    t = getExistingTorrent( manager, torrentHash );
1485    if( t == NULL )
1486    {
1487        *setme_pex = NULL;
1488    }
1489    else
1490    {
1491        int i;
1492        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1493        const int peerCount = tr_ptrArraySize( &t->peers );
1494        /* for now, this will waste memory on torrents that have both
1495         * ipv6 and ipv4 peers */
1496        tr_pex * pex = tr_new( tr_pex, peerCount );
1497        tr_pex * walk = pex;
1498
1499        for( i=0; i<peerCount; ++i )
1500        {
1501            const tr_peer * peer = peers[i];
1502            if( peer->addr.type == af )
1503            {
1504                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1505
1506                assert( tr_isAddress( &peer->addr ) );
1507                walk->addr = peer->addr;
1508                walk->port = peer->port;
1509                walk->flags = 0;
1510                if( peerPrefersCrypto( peer ) )
1511                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1512                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1513                    walk->flags |= ADDED_F_SEED_FLAG;
1514                ++peersReturning;
1515                ++walk;
1516            }
1517        }
1518
1519        assert( ( walk - pex ) == peersReturning );
1520        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1521        *setme_pex = pex;
1522    }
1523
1524    managerUnlock( manager );
1525    return peersReturning;
1526}
1527
1528static int reconnectPulse( void * vtorrent );
1529
1530static int rechokePulse( void * vtorrent );
1531
1532void
1533tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1534                        const uint8_t * torrentHash )
1535{
1536    Torrent * t;
1537
1538    managerLock( manager );
1539
1540    t = getExistingTorrent( manager, torrentHash );
1541
1542    assert( t );
1543    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1544    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1545
1546    if( !t->isRunning )
1547    {
1548        t->isRunning = 1;
1549
1550        t->reconnectTimer = tr_timerNew( t->manager->session,
1551                                         reconnectPulse, t,
1552                                         RECONNECT_PERIOD_MSEC );
1553
1554        t->rechokeTimer = tr_timerNew( t->manager->session,
1555                                       rechokePulse, t,
1556                                       RECHOKE_PERIOD_MSEC );
1557
1558        reconnectPulse( t );
1559
1560        rechokePulse( t );
1561
1562        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1563            refillSoon( t );
1564    }
1565
1566    managerUnlock( manager );
1567}
1568
1569static void
1570stopTorrent( Torrent * t )
1571{
1572    assert( torrentIsLocked( t ) );
1573
1574    t->isRunning = 0;
1575    tr_timerFree( &t->rechokeTimer );
1576    tr_timerFree( &t->reconnectTimer );
1577
1578    /* disconnect the peers. */
1579    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1580    tr_ptrArrayClear( &t->peers );
1581
1582    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1583     * which removes the handshake from t->outgoingHandshakes... */
1584    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1585        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1586}
1587
1588void
1589tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1590                       const uint8_t * torrentHash )
1591{
1592    managerLock( manager );
1593
1594    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1595
1596    managerUnlock( manager );
1597}
1598
1599void
1600tr_peerMgrAddTorrent( tr_peerMgr * manager,
1601                      tr_torrent * tor )
1602{
1603    Torrent * t;
1604
1605    managerLock( manager );
1606
1607    assert( tor );
1608    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1609
1610    t = torrentConstructor( manager, tor );
1611    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1612
1613    managerUnlock( manager );
1614}
1615
1616void
1617tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1618                         const uint8_t * torrentHash )
1619{
1620    Torrent * t;
1621
1622    managerLock( manager );
1623
1624    t = getExistingTorrent( manager, torrentHash );
1625    assert( t );
1626    stopTorrent( t );
1627    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1628    torrentDestructor( t );
1629
1630    managerUnlock( manager );
1631}
1632
1633void
1634tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1635                               const uint8_t *    torrentHash,
1636                               int8_t *           tab,
1637                               unsigned int       tabCount )
1638{
1639    tr_piece_index_t   i;
1640    const Torrent *    t;
1641    const tr_torrent * tor;
1642    float              interval;
1643    tr_bool            isSeed;
1644    int                peerCount;
1645    const tr_peer **   peers;
1646    managerLock( manager );
1647
1648    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1649    tor = t->tor;
1650    interval = tor->info.pieceCount / (float)tabCount;
1651    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1652    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1653    peerCount = tr_ptrArraySize( &t->peers );
1654
1655    memset( tab, 0, tabCount );
1656
1657    for( i = 0; tor && i < tabCount; ++i )
1658    {
1659        const int piece = i * interval;
1660
1661        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1662            tab[i] = -1;
1663        else if( peerCount ) {
1664            int j;
1665            for( j = 0; j < peerCount; ++j )
1666                if( tr_bitfieldHas( peers[j]->have, i ) )
1667                    ++tab[i];
1668        }
1669    }
1670
1671    managerUnlock( manager );
1672}
1673
1674/* Returns the pieces that are available from peers */
1675tr_bitfield*
1676tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1677                        const uint8_t *    torrentHash )
1678{
1679    int i;
1680    int peerCount;
1681    Torrent * t;
1682    const tr_peer ** peers;
1683    tr_bitfield * pieces;
1684    managerLock( manager );
1685
1686    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1687    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1688    peerCount = tr_ptrArraySize( &t->peers );
1689    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1690    for( i=0; i<peerCount; ++i )
1691        tr_bitfieldOr( pieces, peers[i]->have );
1692
1693    managerUnlock( manager );
1694    return pieces;
1695}
1696
1697int
1698tr_peerMgrHasConnections( const tr_peerMgr * manager,
1699                          const uint8_t *    torrentHash )
1700{
1701    int ret;
1702    const Torrent * t;
1703    managerLock( manager );
1704
1705    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1706    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1707
1708    managerUnlock( manager );
1709    return ret;
1710}
1711
1712void
1713tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1714                        const uint8_t    * torrentHash,
1715                        int              * setmePeersKnown,
1716                        int              * setmePeersConnected,
1717                        int              * setmeSeedsConnected,
1718                        int              * setmeWebseedsSendingToUs,
1719                        int              * setmePeersSendingToUs,
1720                        int              * setmePeersGettingFromUs,
1721                        int              * setmePeersFrom )
1722{
1723    int i, size;
1724    const Torrent * t;
1725    const tr_peer ** peers;
1726    const tr_webseed ** webseeds;
1727
1728    managerLock( manager );
1729
1730    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1731    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1732    size = tr_ptrArraySize( &t->peers );
1733
1734    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1735    *setmePeersConnected       = 0;
1736    *setmeSeedsConnected       = 0;
1737    *setmePeersGettingFromUs   = 0;
1738    *setmePeersSendingToUs     = 0;
1739    *setmeWebseedsSendingToUs  = 0;
1740
1741    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1742        setmePeersFrom[i] = 0;
1743
1744    for( i=0; i<size; ++i )
1745    {
1746        const tr_peer * peer = peers[i];
1747        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1748
1749        if( peer->io == NULL ) /* not connected */
1750            continue;
1751
1752        ++*setmePeersConnected;
1753
1754        ++setmePeersFrom[atom->from];
1755
1756        if( clientIsDownloadingFrom( peer ) )
1757            ++*setmePeersSendingToUs;
1758
1759        if( clientIsUploadingTo( peer ) )
1760            ++*setmePeersGettingFromUs;
1761
1762        if( atom->flags & ADDED_F_SEED_FLAG )
1763            ++*setmeSeedsConnected;
1764    }
1765
1766    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1767    size = tr_ptrArraySize( &t->webseeds );
1768    for( i=0; i<size; ++i )
1769        if( tr_webseedIsActive( webseeds[i] ) )
1770            ++*setmeWebseedsSendingToUs;
1771
1772    managerUnlock( manager );
1773}
1774
1775float*
1776tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1777                     const uint8_t *    torrentHash )
1778{
1779    const Torrent * t;
1780    const tr_webseed ** webseeds;
1781    int i;
1782    int webseedCount;
1783    float * ret;
1784    uint64_t now;
1785
1786    assert( manager );
1787    managerLock( manager );
1788
1789    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1790    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1791    webseedCount = tr_ptrArraySize( &t->webseeds );
1792    assert( webseedCount == t->tor->info.webseedCount );
1793    ret = tr_new0( float, webseedCount );
1794    now = tr_date( );
1795
1796    for( i=0; i<webseedCount; ++i )
1797        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1798            ret[i] = -1.0;
1799
1800    managerUnlock( manager );
1801    return ret;
1802}
1803
1804double
1805tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1806{
1807    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1808}
1809
1810
1811struct tr_peer_stat *
1812tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1813                     const   uint8_t     * torrentHash,
1814                     int                 * setmeCount )
1815{
1816    int i, size;
1817    const Torrent * t;
1818    const tr_peer ** peers;
1819    tr_peer_stat * ret;
1820    uint64_t now;
1821
1822    assert( manager );
1823    managerLock( manager );
1824
1825    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1826    size = tr_ptrArraySize( &t->peers );
1827    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1828    ret = tr_new0( tr_peer_stat, size );
1829    now = tr_date( );
1830
1831    for( i = 0; i < size; ++i )
1832    {
1833        char *                   pch;
1834        const tr_peer *          peer = peers[i];
1835        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1836        tr_peer_stat *           stat = ret + i;
1837        tr_address               norm_addr;
1838
1839        norm_addr = peer->addr;
1840        tr_normalizeV4Mapped( &norm_addr );
1841        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1842        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1843                   sizeof( stat->client ) );
1844        stat->port               = ntohs( peer->port );
1845        stat->from               = atom->from;
1846        stat->progress           = peer->progress;
1847        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1848        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1849        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1850        stat->peerIsChoked       = peer->peerIsChoked;
1851        stat->peerIsInterested   = peer->peerIsInterested;
1852        stat->clientIsChoked     = peer->clientIsChoked;
1853        stat->clientIsInterested = peer->clientIsInterested;
1854        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1855        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1856        stat->isUploadingTo      = clientIsUploadingTo( peer );
1857        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1858
1859        pch = stat->flagStr;
1860        if( t->optimistic == peer ) *pch++ = 'O';
1861        if( stat->isDownloadingFrom ) *pch++ = 'D';
1862        else if( stat->clientIsInterested ) *pch++ = 'd';
1863        if( stat->isUploadingTo ) *pch++ = 'U';
1864        else if( stat->peerIsInterested ) *pch++ = 'u';
1865        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1866        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1867        if( stat->isEncrypted ) *pch++ = 'E';
1868        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1869        if( stat->isIncoming ) *pch++ = 'I';
1870        *pch = '\0';
1871    }
1872
1873    *setmeCount = size;
1874
1875    managerUnlock( manager );
1876    return ret;
1877}
1878
1879/**
1880***
1881**/
1882
1883struct ChokeData
1884{
1885    tr_bool         doUnchoke;
1886    tr_bool         isInterested;
1887    tr_bool         isChoked;
1888    int             rate;
1889    tr_peer *       peer;
1890};
1891
1892static int
1893compareChoke( const void * va,
1894              const void * vb )
1895{
1896    const struct ChokeData * a = va;
1897    const struct ChokeData * b = vb;
1898
1899    if( a->rate != b->rate ) /* prefer higher overall speeds */
1900        return a->rate > b->rate ? -1 : 1;
1901
1902    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1903        return a->isChoked ? 1 : -1;
1904
1905    return 0;
1906}
1907
1908static int
1909isNew( const tr_peer * peer )
1910{
1911    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1912}
1913
1914static int
1915isSame( const tr_peer * peer )
1916{
1917    return peer && peer->client && strstr( peer->client, "Transmission" );
1918}
1919
1920/**
1921***
1922**/
1923
1924static void
1925rechoke( Torrent * t )
1926{
1927    int i, size, unchokedInterested;
1928    const int peerCount = tr_ptrArraySize( &t->peers );
1929    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1930    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1931    const tr_session * session = t->manager->session;
1932    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1933    const uint64_t now = tr_date( );
1934
1935    assert( torrentIsLocked( t ) );
1936
1937    /* sort the peers by preference and rate */
1938    for( i = 0, size = 0; i < peerCount; ++i )
1939    {
1940        tr_peer * peer = peers[i];
1941        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1942
1943        if( peer->progress >= 1.0 ) /* choke all seeds */
1944        {
1945            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1946        }
1947        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1948        {
1949            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1950        }
1951        else if( chokeAll ) /* choke everyone if we're not uploading */
1952        {
1953            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1954        }
1955        else
1956        {
1957            struct ChokeData * n = &choke[size++];
1958            n->peer         = peer;
1959            n->isInterested = peer->peerIsInterested;
1960            n->isChoked     = peer->peerIsChoked;
1961            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1962        }
1963    }
1964
1965    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1966
1967    /**
1968     * Reciprocation and number of uploads capping is managed by unchoking
1969     * the N peers which have the best upload rate and are interested.
1970     * This maximizes the client's download rate. These N peers are
1971     * referred to as downloaders, because they are interested in downloading
1972     * from the client.
1973     *
1974     * Peers which have a better upload rate (as compared to the downloaders)
1975     * but aren't interested get unchoked. If they become interested, the
1976     * downloader with the worst upload rate gets choked. If a client has
1977     * a complete file, it uses its upload rate rather than its download
1978     * rate to decide which peers to unchoke.
1979     */
1980    unchokedInterested = 0;
1981    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1982        choke[i].doUnchoke = 1;
1983        if( choke[i].isInterested )
1984            ++unchokedInterested;
1985    }
1986
1987    /* optimistic unchoke */
1988    if( i < size )
1989    {
1990        int n;
1991        struct ChokeData * c;
1992        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1993
1994        for( ; i<size; ++i )
1995        {
1996            if( choke[i].isInterested )
1997            {
1998                const tr_peer * peer = choke[i].peer;
1999                int x = 1, y;
2000                if( isNew( peer ) ) x *= 3;
2001                if( isSame( peer ) ) x *= 3;
2002                for( y=0; y<x; ++y )
2003                    tr_ptrArrayAppend( &randPool, &choke[i] );
2004            }
2005        }
2006
2007        if(( n = tr_ptrArraySize( &randPool )))
2008        {
2009            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2010            c->doUnchoke = 1;
2011            t->optimistic = c->peer;
2012        }
2013
2014        tr_ptrArrayDestruct( &randPool, NULL );
2015    }
2016
2017    for( i=0; i<size; ++i )
2018        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2019
2020    /* cleanup */
2021    tr_free( choke );
2022}
2023
2024static int
2025rechokePulse( void * vtorrent )
2026{
2027    Torrent * t = vtorrent;
2028
2029    torrentLock( t );
2030    rechoke( t );
2031    torrentUnlock( t );
2032    return TRUE;
2033}
2034
2035/***
2036****
2037****  Life and Death
2038****
2039***/
2040
2041static int
2042shouldPeerBeClosed( const Torrent * t,
2043                    const tr_peer * peer,
2044                    int             peerCount )
2045{
2046    const tr_torrent *       tor = t->tor;
2047    const time_t             now = time( NULL );
2048    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2049
2050    /* if it's marked for purging, close it */
2051    if( peer->doPurge )
2052    {
2053        tordbg( t, "purging peer %s because its doPurge flag is set",
2054                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2055        return TRUE;
2056    }
2057
2058    /* if we're seeding and the peer has everything we have,
2059     * and enough time has passed for a pex exchange, then disconnect */
2060    if( tr_torrentIsSeed( tor ) )
2061    {
2062        int peerHasEverything;
2063        if( atom->flags & ADDED_F_SEED_FLAG )
2064            peerHasEverything = TRUE;
2065        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2066            peerHasEverything = FALSE;
2067        else {
2068            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2069            tr_bitfieldDifference( tmp, peer->have );
2070            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2071            tr_bitfieldFree( tmp );
2072        }
2073
2074        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2075        {
2076            tordbg( t, "purging peer %s because we're both seeds",
2077                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2078            return TRUE;
2079        }
2080    }
2081
2082    /* disconnect if it's been too long since piece data has been transferred.
2083     * this is on a sliding scale based on number of available peers... */
2084    {
2085        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2086        /* if we have >= relaxIfFewerThan, strictness is 100%.
2087         * if we have zero connections, strictness is 0% */
2088        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2089            ? 1.0
2090            : peerCount / (float)relaxStrictnessIfFewerThanN;
2091        const int lo = MIN_UPLOAD_IDLE_SECS;
2092        const int hi = MAX_UPLOAD_IDLE_SECS;
2093        const int limit = hi - ( ( hi - lo ) * strictness );
2094        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2095/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2096        if( idleTime > limit ) {
2097            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2098                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2099            return TRUE;
2100        }
2101    }
2102
2103    return FALSE;
2104}
2105
2106static tr_peer **
2107getPeersToClose( Torrent * t, int * setmeSize )
2108{
2109    int i, peerCount, outsize;
2110    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2111    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2112
2113    assert( torrentIsLocked( t ) );
2114
2115    for( i = outsize = 0; i < peerCount; ++i )
2116        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2117            ret[outsize++] = peers[i];
2118
2119    *setmeSize = outsize;
2120    return ret;
2121}
2122
2123static int
2124compareCandidates( const void * va,
2125                   const void * vb )
2126{
2127    const struct peer_atom * a = *(const struct peer_atom**) va;
2128    const struct peer_atom * b = *(const struct peer_atom**) vb;
2129
2130    /* <Charles> Here we would probably want to try reconnecting to
2131     * peers that had most recently given us data. Lots of users have
2132     * trouble with resets due to their routers and/or ISPs. This way we
2133     * can quickly recover from an unwanted reset. So we sort
2134     * piece_data_time in descending order.
2135     */
2136
2137    if( a->piece_data_time != b->piece_data_time )
2138        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2139
2140    if( a->numFails != b->numFails )
2141        return a->numFails < b->numFails ? -1 : 1;
2142
2143    if( a->time != b->time )
2144        return a->time < b->time ? -1 : 1;
2145
2146    /* all other things being equal, prefer peers whose
2147     * information comes from a more reliable source */
2148    if( a->from != b->from )
2149        return a->from < b->from ? -1 : 1;
2150
2151    return 0;
2152}
2153
2154static int
2155getReconnectIntervalSecs( const struct peer_atom * atom )
2156{
2157    int          sec;
2158    const time_t now = time( NULL );
2159
2160    /* if we were recently connected to this peer and transferring piece
2161     * data, try to reconnect to them sooner rather that later -- we don't
2162     * want network troubles to get in the way of a good peer. */
2163    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2164        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2165
2166    /* don't allow reconnects more often than our minimum */
2167    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2168        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2169
2170    /* otherwise, the interval depends on how many times we've tried
2171     * and failed to connect to the peer */
2172    else switch( atom->numFails ) {
2173        case 0: sec = 0; break;
2174        case 1: sec = 5; break;
2175        case 2: sec = 2 * 60; break;
2176        case 3: sec = 15 * 60; break;
2177        case 4: sec = 30 * 60; break;
2178        case 5: sec = 60 * 60; break;
2179        default: sec = 120 * 60; break;
2180    }
2181
2182    return sec;
2183}
2184
2185static struct peer_atom **
2186getPeerCandidates( Torrent * t, int * setmeSize )
2187{
2188    int                 i, atomCount, retCount;
2189    struct peer_atom ** atoms;
2190    struct peer_atom ** ret;
2191    const time_t        now = time( NULL );
2192    const int           seed = tr_torrentIsSeed( t->tor );
2193
2194    assert( torrentIsLocked( t ) );
2195
2196    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2197    ret = tr_new( struct peer_atom*, atomCount );
2198    for( i = retCount = 0; i < atomCount; ++i )
2199    {
2200        int                interval;
2201        struct peer_atom * atom = atoms[i];
2202
2203        /* peer fed us too much bad data ... we only keep it around
2204         * now to weed it out in case someone sends it to us via pex */
2205        if( atom->myflags & MYFLAG_BANNED )
2206            continue;
2207
2208        /* peer was unconnectable before, so we're not going to keep trying.
2209         * this is needs a separate flag from `banned', since if they try
2210         * to connect to us later, we'll let them in */
2211        if( atom->myflags & MYFLAG_UNREACHABLE )
2212            continue;
2213
2214        /* we don't need two connections to the same peer... */
2215        if( peerIsInUse( t, &atom->addr ) )
2216            continue;
2217
2218        /* no need to connect if we're both seeds... */
2219        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2220                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2221            continue;
2222
2223        /* don't reconnect too often */
2224        interval = getReconnectIntervalSecs( atom );
2225        if( ( now - atom->time ) < interval )
2226        {
2227            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2228                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2229            continue;
2230        }
2231
2232        /* Don't connect to peers in our blocklist */
2233        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2234            continue;
2235
2236        ret[retCount++] = atom;
2237    }
2238
2239    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2240    *setmeSize = retCount;
2241    return ret;
2242}
2243
2244static int
2245reconnectPulse( void * vtorrent )
2246{
2247    Torrent *     t = vtorrent;
2248    static time_t prevTime = 0;
2249    static int    newConnectionsThisSecond = 0;
2250    time_t        now;
2251
2252    torrentLock( t );
2253
2254    now = time( NULL );
2255    if( prevTime != now )
2256    {
2257        prevTime = now;
2258        newConnectionsThisSecond = 0;
2259    }
2260
2261    if( !t->isRunning )
2262    {
2263        removeAllPeers( t );
2264    }
2265    else
2266    {
2267        int i, nCandidates, nBad;
2268        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2269        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2270
2271        //if( nBad || nCandidates )
2272            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2273                    "%d connection candidates, %d atoms, max per pulse is %d",
2274                    t->tor->info.name, nBad, nCandidates,
2275                    tr_ptrArraySize( &t->pool ),
2276                    (int)MAX_RECONNECTIONS_PER_PULSE );
2277
2278        /* disconnect some peers.
2279           if we transferred piece data, then they might be good peers,
2280           so reset their `numFails' weight to zero.  otherwise we connected
2281           to them fruitlessly, so mark it as another fail */
2282        for( i = 0; i < nBad; ++i ) {
2283            tr_peer * peer = connections[i];
2284            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2285            if( atom->piece_data_time )
2286                atom->numFails = 0;
2287            else
2288                ++atom->numFails;
2289            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2290            removePeer( t, peer );
2291        }
2292
2293tordbg( t, "nCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, getPeerCount(t) is %d, getMaxPeerCount(t) is %d, newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2294        (int)nCandidates, (int)MAX_RECONNECTIONS_PER_PULSE, (int)getPeerCount( t ), (int)getMaxPeerCount( t->tor ), (int)newConnectionsThisSecond, (int)MAX_CONNECTIONS_PER_SECOND );
2295
2296        /* add some new ones */
2297        for( i = 0;    ( i < nCandidates )
2298           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2299           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2300           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2301        {
2302            tr_peerMgr *       mgr = t->manager;
2303            struct peer_atom * atom = candidates[i];
2304            tr_peerIo *        io;
2305
2306            tordbg( t, "Starting an OUTGOING connection with %s",
2307                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2308
2309            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2310
2311            if( io == NULL )
2312            {
2313                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2314                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2315                atom->myflags |= MYFLAG_UNREACHABLE;
2316            }
2317            else
2318            {
2319                tr_handshake * handshake = tr_handshakeNew( io,
2320                                                            mgr->session->encryptionMode,
2321                                                            myHandshakeDoneCB,
2322                                                            mgr );
2323
2324                assert( tr_peerIoGetTorrentHash( io ) );
2325
2326                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2327
2328                ++newConnectionsThisSecond;
2329
2330                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2331                                         handshakeCompare );
2332            }
2333
2334            atom->time = time( NULL );
2335        }
2336
2337        /* cleanup */
2338        tr_free( connections );
2339        tr_free( candidates );
2340    }
2341
2342    torrentUnlock( t );
2343    return TRUE;
2344}
2345
2346/****
2347*****
2348*****  BANDWIDTH ALLOCATION
2349*****
2350****/
2351
2352static void
2353pumpAllPeers( tr_peerMgr * mgr )
2354{
2355    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2356    int       i, j;
2357
2358    for( i=0; i<torrentCount; ++i )
2359    {
2360        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2361        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2362        {
2363            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2364            tr_peerMsgsPulse( peer->msgs );
2365        }
2366    }
2367}
2368
2369static int
2370bandwidthPulse( void * vmgr )
2371{
2372    tr_peerMgr * mgr = vmgr;
2373    managerLock( mgr );
2374
2375    /* FIXME: this next line probably isn't necessary... */
2376    pumpAllPeers( mgr );
2377
2378    /* allocate bandwidth to the peers */
2379    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2380    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2381
2382    managerUnlock( mgr );
2383    return TRUE;
2384}
Note: See TracBrowser for help on using the repository browser.