source: trunk/libtransmission/peer-mgr.c @ 7948

Last change on this file since 7948 was 7948, checked in by charles, 13 years ago

(trunk libT) #1829: age out the refill list every 10 seconds, not every 1 second

  • Property svn:keywords set to Date Rev Author Id
File size: 70.7 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7948 2009-02-25 22:15:04Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to age out old piece request lists */
56    REFILL_UPKEEP_PERIOD_MSEC = 10000,
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = 500,
60
61    /* when many peers are available, keep idle ones this long */
62    MIN_UPLOAD_IDLE_SECS = ( 30 ),
63
64    /* when few peers are available, keep idle ones this long */
65    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
66
67    /* max # of peers to ask fer per torrent per reconnect pulse */
68    MAX_RECONNECTIONS_PER_PULSE = 16,
69
70    /* max number of peers to ask for per second overall.
71    * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 32,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* amount of time to keep a list of request pieces lying around
78       before it's considered too old and needs to be rebuilt */
79    PIECE_LIST_SHELF_LIFE_SECS = 60,
80
81    /* use for bitwise operations w/peer_atom.myflags */
82    MYFLAG_BANNED = 1,
83
84    /* use for bitwise operations w/peer_atom.myflags */
85    /* unreachable for now... but not banned.
86     * if they try to connect to us it's okay */
87    MYFLAG_UNREACHABLE = 2,
88
89    /* the minimum we'll wait before attempting to reconnect to a peer */
90    MINIMUM_RECONNECT_INTERVAL_SECS = 5
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125};
126
127struct tr_blockIterator
128{
129    time_t expirationDate;
130    struct tr_torrent_peers * t;
131    tr_block_index_t blockIndex, blockCount, *blocks;
132    tr_piece_index_t pieceIndex, pieceCount, *pieces;
133};
134
135typedef struct tr_torrent_peers
136{
137    tr_bool                    isRunning;
138
139    uint8_t                    hash[SHA_DIGEST_LENGTH];
140    int                      * pendingRequestCount;
141    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
142    tr_ptrArray                pool; /* struct peer_atom */
143    tr_ptrArray                peers; /* tr_peer */
144    tr_ptrArray                webseeds; /* tr_webseed */
145    tr_timer                 * refillTimer;
146    tr_torrent               * tor;
147    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
148    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
149
150    struct tr_peerMgr        * manager;
151}
152Torrent;
153
154struct tr_peerMgr
155{
156    tr_session      * session;
157    tr_ptrArray       incomingHandshakes; /* tr_handshake */
158    tr_timer        * bandwidthTimer;
159    tr_timer        * rechokeTimer;
160    tr_timer        * reconnectTimer;
161    tr_timer        * refillUpkeepTimer;
162};
163
164#define tordbg( t, ... ) \
165    do { \
166        if( tr_deepLoggingIsActive( ) ) \
167            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
168    } while( 0 )
169
170#define dbgmsg( ... ) \
171    do { \
172        if( tr_deepLoggingIsActive( ) ) \
173            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
174    } while( 0 )
175
176/**
177***
178**/
179
180static TR_INLINE void
181managerLock( const struct tr_peerMgr * manager )
182{
183    tr_globalLock( manager->session );
184}
185
186static TR_INLINE void
187managerUnlock( const struct tr_peerMgr * manager )
188{
189    tr_globalUnlock( manager->session );
190}
191
192static TR_INLINE void
193torrentLock( Torrent * torrent )
194{
195    managerLock( torrent->manager );
196}
197
198static TR_INLINE void
199torrentUnlock( Torrent * torrent )
200{
201    managerUnlock( torrent->manager );
202}
203
204static TR_INLINE int
205torrentIsLocked( const Torrent * t )
206{
207    return tr_globalIsLocked( t->manager->session );
208}
209
210/**
211***
212**/
213
214static int
215handshakeCompareToAddr( const void * va, const void * vb )
216{
217    const tr_handshake * a = va;
218
219    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
220}
221
222static int
223handshakeCompare( const void * a, const void * b )
224{
225    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
226}
227
228static tr_handshake*
229getExistingHandshake( tr_ptrArray      * handshakes,
230                      const tr_address * addr )
231{
232    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
233}
234
235static int
236comparePeerAtomToAddress( const void * va, const void * vb )
237{
238    const struct peer_atom * a = va;
239
240    return tr_compareAddresses( &a->addr, vb );
241}
242
243static int
244comparePeerAtoms( const void * va, const void * vb )
245{
246    const struct peer_atom * b = vb;
247
248    return comparePeerAtomToAddress( va, &b->addr );
249}
250
251/**
252***
253**/
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
260
261    return tor == NULL ? NULL : tor->torrentPeers;
262}
263
264static int
265peerCompare( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    const tr_peer * b = vb;
269
270    return tr_compareAddresses( &a->addr, &b->addr );
271}
272
273static int
274peerCompareToAddr( const void * va, const void * vb )
275{
276    const tr_peer * a = va;
277
278    return tr_compareAddresses( &a->addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent          * torrent,
283                 const tr_address * addr )
284{
285    assert( torrentIsLocked( torrent ) );
286    assert( addr );
287
288    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
289}
290
291static struct peer_atom*
292getExistingAtom( const Torrent    * t,
293                 const tr_address * addr )
294{
295    Torrent * tt = (Torrent*)t;
296    assert( torrentIsLocked( t ) );
297    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
298}
299
300static tr_bool
301peerIsInUse( const Torrent    * ct,
302             const tr_address * addr )
303{
304    Torrent * t = (Torrent*) ct;
305
306    assert( torrentIsLocked ( t ) );
307
308    return getExistingPeer( t, addr )
309        || getExistingHandshake( &t->outgoingHandshakes, addr )
310        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
311}
312
313static tr_peer*
314peerConstructor( const tr_address * addr )
315{
316    tr_peer * p;
317    p = tr_new0( tr_peer, 1 );
318    p->addr = *addr;
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent          * torrent,
324         const tr_address * addr )
325{
326    tr_peer * peer;
327
328    assert( torrentIsLocked( torrent ) );
329
330    peer = getExistingPeer( torrent, addr );
331
332    if( peer == NULL )
333    {
334        peer = peerConstructor( addr );
335        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
336    }
337
338    return peer;
339}
340
341static void
342peerDestructor( tr_peer * peer )
343{
344    assert( peer );
345
346    if( peer->msgs != NULL )
347    {
348        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
349        tr_peerMsgsFree( peer->msgs );
350    }
351
352    tr_peerIoClear( peer->io );
353    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
354
355    tr_bitfieldFree( peer->have );
356    tr_bitfieldFree( peer->blame );
357    tr_free( peer->client );
358
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t,
364            tr_peer * peer )
365{
366    tr_peer *          removed;
367    struct peer_atom * atom;
368
369    assert( torrentIsLocked( t ) );
370
371    atom = getExistingAtom( t, &peer->addr );
372    assert( atom );
373    atom->time = time( NULL );
374
375    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
376    assert( removed == peer );
377    peerDestructor( removed );
378}
379
380static void
381removeAllPeers( Torrent * t )
382{
383    while( !tr_ptrArrayEmpty( &t->peers ) )
384        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
385}
386
387static void blockIteratorFree( struct tr_blockIterator ** inout );
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( torrentIsLocked( t ) );
398    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
399    assert( tr_ptrArrayEmpty( &t->peers ) );
400
401    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
402
403    tr_timerFree( &t->refillTimer );
404
405    blockIteratorFree( &t->refillQueue );
406    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
407    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
408    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
409    tr_ptrArrayDestruct( &t->peers, NULL );
410
411    tr_free( t->pendingRequestCount );
412    tr_free( t );
413}
414
415static void peerCallbackFunc( void * vpeer,
416                              void * vevent,
417                              void * vt );
418
419static Torrent*
420torrentConstructor( tr_peerMgr * manager,
421                    tr_torrent * tor )
422{
423    int       i;
424    Torrent * t;
425
426    t = tr_new0( Torrent, 1 );
427    t->manager = manager;
428    t->tor = tor;
429    t->pool = TR_PTR_ARRAY_INIT;
430    t->peers = TR_PTR_ARRAY_INIT;
431    t->webseeds = TR_PTR_ARRAY_INIT;
432    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
434
435    for( i = 0; i < tor->info.webseedCount; ++i )
436    {
437        tr_webseed * w =
438            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
439        tr_ptrArrayAppend( &t->webseeds, w );
440    }
441
442    return t;
443}
444
445
446static int bandwidthPulse ( void * vmgr );
447static int rechokePulse   ( void * vmgr );
448static int reconnectPulse ( void * vmgr );
449static int refillUpkeep   ( void * vmgr );
450
451tr_peerMgr*
452tr_peerMgrNew( tr_session * session )
453{
454    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
455
456    m->session = session;
457    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
458    m->bandwidthTimer    = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
459    m->rechokeTimer      = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
460    m->reconnectTimer    = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
461    m->refillUpkeepTimer = tr_timerNew( session, refillUpkeep,   m, REFILL_UPKEEP_PERIOD_MSEC );
462
463    rechokePulse( m );
464
465    return m;
466}
467
468void
469tr_peerMgrFree( tr_peerMgr * manager )
470{
471    managerLock( manager );
472
473    tr_timerFree( &manager->refillUpkeepTimer );
474    tr_timerFree( &manager->reconnectTimer );
475    tr_timerFree( &manager->rechokeTimer );
476    tr_timerFree( &manager->bandwidthTimer );
477
478    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
479     * the item from manager->handshakes, so this is a little roundabout... */
480    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
481        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
482
483    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
484
485    managerUnlock( manager );
486    tr_free( manager );
487}
488
489static int
490clientIsDownloadingFrom( const tr_peer * peer )
491{
492    return peer->clientIsInterested && !peer->clientIsChoked;
493}
494
495static int
496clientIsUploadingTo( const tr_peer * peer )
497{
498    return peer->peerIsInterested && !peer->peerIsChoked;
499}
500
501/***
502****
503***/
504
505tr_bool
506tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
507                      const tr_address  * addr )
508{
509    tr_bool isSeed = FALSE;
510    const Torrent * t = tor->torrentPeers;
511    const struct peer_atom * atom = getExistingAtom( t, addr );
512
513    if( atom )
514        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
515
516    return isSeed;
517}
518
519/****
520*****
521*****  REFILL
522*****
523****/
524
525static void
526assertValidPiece( Torrent * t, tr_piece_index_t piece )
527{
528    assert( t );
529    assert( t->tor );
530    assert( piece < t->tor->info.pieceCount );
531}
532
533static int
534getPieceRequests( Torrent * t, tr_piece_index_t piece )
535{
536    assertValidPiece( t, piece );
537
538    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
539}
540
541static void
542incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
543{
544    assertValidPiece( t, piece );
545
546    if( t->pendingRequestCount == NULL )
547        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
548    t->pendingRequestCount[piece]++;
549}
550
551static void
552decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
553{
554    assertValidPiece( t, piece );
555
556    if( t->pendingRequestCount )
557        t->pendingRequestCount[piece]--;
558}
559
560struct tr_refill_piece
561{
562    tr_priority_t    priority;
563    uint32_t         piece;
564    uint32_t         peerCount;
565    int              random;
566    int              pendingRequestCount;
567    int              missingBlockCount;
568};
569
570static int
571compareRefillPiece( const void * aIn, const void * bIn )
572{
573    const struct tr_refill_piece * a = aIn;
574    const struct tr_refill_piece * b = bIn;
575
576    /* if one piece has a higher priority, it goes first */
577    if( a->priority != b->priority )
578        return a->priority > b->priority ? -1 : 1;
579
580    /* have a per-priority endgame */
581    if( a->pendingRequestCount != b->pendingRequestCount )
582        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
583
584    /* fewer missing pieces goes first */
585    if( a->missingBlockCount != b->missingBlockCount )
586        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
587
588    /* otherwise if one has fewer peers, it goes first */
589    if( a->peerCount != b->peerCount )
590        return a->peerCount < b->peerCount ? -1 : 1;
591
592    /* otherwise go with our random seed */
593    if( a->random != b->random )
594        return a->random < b->random ? -1 : 1;
595
596    return 0;
597}
598
599static tr_piece_index_t *
600getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
601{
602    const tr_torrent  * tor = t->tor;
603    const tr_info     * inf = &tor->info;
604    tr_piece_index_t    i;
605    tr_piece_index_t    poolSize = 0;
606    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
607    int                 peerCount;
608    const tr_peer    ** peers;
609
610    assert( torrentIsLocked( t ) );
611
612    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
613    peerCount = tr_ptrArraySize( &t->peers );
614
615    /* make a list of the pieces that we want but don't have */
616    for( i = 0; i < inf->pieceCount; ++i )
617        if( !tor->info.pieces[i].dnd
618                && !tr_cpPieceIsComplete( &tor->completion, i ) )
619            pool[poolSize++] = i;
620
621    /* sort the pool by which to request next */
622    if( poolSize > 1 )
623    {
624        tr_piece_index_t j;
625        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
626
627        for( j = 0; j < poolSize; ++j )
628        {
629            int k;
630            const tr_piece_index_t piece = pool[j];
631            struct tr_refill_piece * setme = p + j;
632
633            setme->piece = piece;
634            setme->priority = inf->pieces[piece].priority;
635            setme->peerCount = 0;
636            setme->random = tr_cryptoWeakRandInt( INT_MAX );
637            setme->pendingRequestCount = getPieceRequests( t, piece );
638            setme->missingBlockCount
639                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
640
641            for( k = 0; k < peerCount; ++k )
642            {
643                const tr_peer * peer = peers[k];
644                if( peer->peerIsInterested
645                        && !peer->clientIsChoked
646                        && tr_bitfieldHas( peer->have, piece ) )
647                    ++setme->peerCount;
648            }
649        }
650
651        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
652               compareRefillPiece );
653
654        for( j = 0; j < poolSize; ++j )
655            pool[j] = p[j].piece;
656
657        tr_free( p );
658    }
659
660    *pieceCount = poolSize;
661    return pool;
662}
663
664static struct tr_blockIterator*
665blockIteratorNew( Torrent * t )
666{
667    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
668    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
669    i->t = t;
670    i->pieces = getPreferredPieces( t, &i->pieceCount );
671    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
672    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
673    return i;
674}
675
676static tr_bool
677blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
678{
679    tr_bool found;
680    Torrent * t = i->t;
681    tr_torrent * tor = t->tor;
682
683    while( ( i->blockIndex == i->blockCount )
684        && ( i->pieceIndex < i->pieceCount ) )
685    {
686        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
687        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
688        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
689        tr_block_index_t block;
690
691        assert( index < tor->info.pieceCount );
692
693        i->blockCount = 0;
694        i->blockIndex = 0;
695        for( block=b; block!=e; ++block )
696            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
697                i->blocks[i->blockCount++] = block;
698    }
699
700    assert( i->blockCount <= tor->blockCountInPiece );
701
702    if(( found = ( i->blockIndex < i->blockCount )))
703        *setme = i->blocks[i->blockIndex++];
704
705    return found;
706}
707
708static void
709blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
710{
711    i->blockIndex = i->blockCount;
712}
713
714static void
715blockIteratorFree( struct tr_blockIterator ** inout )
716{
717    struct tr_blockIterator * it = *inout;
718
719    if( it != NULL )
720    {
721        tr_free( it->blocks );
722        tr_free( it->pieces );
723        tr_free( it );
724    }
725
726    *inout = NULL;
727}
728
729static tr_peer**
730getPeersUploadingToClient( Torrent * t,
731                           int *     setmeCount )
732{
733    int j;
734    int peerCount = 0;
735    int retCount = 0;
736    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
737    tr_peer ** ret = tr_new( tr_peer *, peerCount );
738
739    j = 0; /* this is a temporary test to make sure we walk through all the peers */
740    if( peerCount )
741    {
742        /* Get a list of peers we're downloading from.
743           Pick a different starting point each time so all peers
744           get a chance at being the first in line */
745        const int fencepost = tr_cryptoWeakRandInt( peerCount );
746        int i = fencepost;
747        do {
748            if( clientIsDownloadingFrom( peers[i] ) )
749                ret[retCount++] = peers[i];
750            i = ( i + 1 ) % peerCount;
751            ++j;
752        } while( i != fencepost );
753    }
754    assert( j == peerCount );
755    *setmeCount = retCount;
756    return ret;
757}
758
759static uint32_t
760getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
761{
762    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
763    const uint64_t blockPos = tor->blockSize * b;
764    assert( blockPos >= piecePos );
765    return (uint32_t)( blockPos - piecePos );
766}
767
768static int
769refillUpkeep( void * vmgr )
770{
771    tr_torrent * tor = NULL;
772    tr_peerMgr * mgr = vmgr;
773    time_t now;
774    managerLock( mgr );
775
776    now = time( NULL );
777    while(( tor = tr_torrentNext( mgr->session, tor ))) {
778        Torrent * t = tor->torrentPeers;
779        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
780            tordbg( t, "refill queue is past its shelf date; discarding." );
781            blockIteratorFree( &t->refillQueue );
782        }
783    }
784
785    managerUnlock( mgr );
786    return TRUE;
787}
788
789static int
790refillPulse( void * vtorrent )
791{
792    tr_block_index_t block;
793    int peerCount;
794    int webseedCount;
795    tr_peer ** peers;
796    tr_webseed ** webseeds;
797    Torrent * t = vtorrent;
798    tr_torrent * tor = t->tor;
799    tr_bool hasNext = TRUE;
800
801    if( !t->isRunning )
802        return TRUE;
803    if( tr_torrentIsSeed( t->tor ) )
804        return TRUE;
805
806    torrentLock( t );
807    tordbg( t, "Refilling Request Buffers..." );
808
809    if( t->refillQueue == NULL )
810        t->refillQueue = blockIteratorNew( t );
811
812    peers = getPeersUploadingToClient( t, &peerCount );
813    webseedCount = tr_ptrArraySize( &t->webseeds );
814    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
815                          webseedCount * sizeof( tr_webseed* ) );
816
817    while( ( webseedCount || peerCount )
818        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
819    {
820        int j;
821        tr_bool handled = FALSE;
822
823        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
824        const uint32_t offset = getBlockOffsetInPiece( tor, block );
825        const uint32_t length = tr_torBlockCountBytes( tor, block );
826
827        assert( block < tor->blockCount );
828
829        /* find a peer who can ask for this block */
830        for( j=0; !handled && j<peerCount; )
831        {
832            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
833            switch( val )
834            {
835                case TR_ADDREQ_FULL:
836                case TR_ADDREQ_CLIENT_CHOKED:
837                    peers[j] = peers[--peerCount];
838                    break;
839
840                case TR_ADDREQ_MISSING:
841                case TR_ADDREQ_DUPLICATE:
842                    ++j;
843                    break;
844
845                case TR_ADDREQ_OK:
846                    incrementPieceRequests( t, index );
847                    handled = TRUE;
848                    break;
849
850                default:
851                    assert( 0 && "unhandled value" );
852                    break;
853            }
854        }
855
856        /* maybe one of the webseeds can do it */
857        for( j=0; !handled && j<webseedCount; )
858        {
859            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
860            switch( val )
861            {
862                case TR_ADDREQ_FULL:
863                    webseeds[j] = webseeds[--webseedCount];
864                    break;
865
866                case TR_ADDREQ_OK:
867                    incrementPieceRequests( t, index );
868                    handled = TRUE;
869                    break;
870
871                default:
872                    assert( 0 && "unhandled value" );
873                    break;
874            }
875        }
876
877        if( !handled )
878            blockIteratorSkipCurrentPiece( t->refillQueue );
879    }
880
881    /* cleanup */
882    tr_free( webseeds );
883    tr_free( peers );
884
885    if( !hasNext ) {
886        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
887        blockIteratorFree( &t->refillQueue );
888    }
889
890    t->refillTimer = NULL;
891    torrentUnlock( t );
892    return FALSE;
893}
894
895static void
896broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
897{
898    size_t i;
899    size_t peerCount;
900    tr_peer ** peers;
901
902    assert( torrentIsLocked( t ) );
903
904    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
905
906    peerCount = tr_ptrArraySize( &t->peers );
907    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
908    for( i=0; i<peerCount; ++i )
909        if( peers[i]->msgs )
910            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
911}
912
913static void
914addStrike( Torrent * t,
915           tr_peer * peer )
916{
917    tordbg( t, "increasing peer %s strike count to %d",
918            tr_peerIoAddrStr( &peer->addr,
919                              peer->port ), peer->strikes + 1 );
920
921    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
922    {
923        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
924        atom->myflags |= MYFLAG_BANNED;
925        peer->doPurge = 1;
926        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
927    }
928}
929
930static void
931gotBadPiece( Torrent *        t,
932             tr_piece_index_t pieceIndex )
933{
934    tr_torrent *   tor = t->tor;
935    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
936
937    tor->corruptCur += byteCount;
938    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
939}
940
941static void
942refillSoon( Torrent * t )
943{
944    if( t->refillTimer == NULL )
945        t->refillTimer = tr_timerNew( t->manager->session,
946                                      refillPulse, t,
947                                      REFILL_PERIOD_MSEC );
948}
949
950static void
951peerSuggestedPiece( Torrent            * t UNUSED,
952                    tr_peer            * peer UNUSED,
953                    tr_piece_index_t     pieceIndex UNUSED,
954                    int                  isFastAllowed UNUSED )
955{
956#if 0
957    assert( t );
958    assert( peer );
959    assert( peer->msgs );
960
961    /* is this a valid piece? */
962    if(  pieceIndex >= t->tor->info.pieceCount )
963        return;
964
965    /* don't ask for it if we've already got it */
966    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
967        return;
968
969    /* don't ask for it if they don't have it */
970    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
971        return;
972
973    /* don't ask for it if we're choked and it's not fast */
974    if( !isFastAllowed && peer->clientIsChoked )
975        return;
976
977    /* request the blocks that we don't have in this piece */
978    {
979        tr_block_index_t block;
980        const tr_torrent * tor = t->tor;
981        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
982        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
983
984        for( block=start; block<end; ++block )
985        {
986            if( !tr_cpBlockIsComplete( tor->completion, block ) )
987            {
988                const uint32_t offset = getBlockOffsetInPiece( tor, block );
989                const uint32_t length = tr_torBlockCountBytes( tor, block );
990                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
991                incrementPieceRequests( t, pieceIndex );
992            }
993        }
994    }
995#endif
996}
997
998static void
999fireRatioLimitHit( tr_torrent * tor )
1000{
1001    assert( tr_isTorrent( tor ) );
1002
1003    if( tor->ratio_limit_hit_func )
1004        tor->ratio_limit_hit_func( tor, tor->ratio_limit_hit_func_user_data );
1005}
1006
1007static void
1008peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1009{
1010    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1011    Torrent * t = vt;
1012    const tr_peer_event * e = vevent;
1013
1014    torrentLock( t );
1015
1016    switch( e->eventType )
1017    {
1018        case TR_PEER_UPLOAD_ONLY:
1019            /* update our atom */
1020            if( peer ) {
1021                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1022                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1023            }
1024            break;
1025
1026        case TR_PEER_NEED_REQ:
1027            refillSoon( t );
1028            break;
1029
1030        case TR_PEER_CANCEL:
1031            decrementPieceRequests( t, e->pieceIndex );
1032            break;
1033
1034        case TR_PEER_PEER_GOT_DATA:
1035        {
1036            const time_t now = time( NULL );
1037            tr_torrent * tor = t->tor;
1038            double seedRatio;
1039
1040            tor->activityDate = now;
1041
1042            if( e->wasPieceData )
1043                tor->uploadedCur += e->length;
1044
1045            /* update the stats */
1046            if( e->wasPieceData )
1047                tr_statsAddUploaded( tor->session, e->length );
1048
1049            /* update our atom */
1050            if( peer ) {
1051                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1052                if( e->wasPieceData )
1053                    a->piece_data_time = now;
1054            }
1055
1056            /* if we're seeding and we've reached our seed ratio limit, stop the torrent */
1057            if( tr_torrentIsSeed( tor ) && tr_torrentGetSeedRatio( tor, &seedRatio ) ) {
1058                const double up = tor->uploadedCur + tor->uploadedPrev;
1059                const double down = tor->downloadedCur + tor->downloadedPrev;
1060                const double ratio = tr_getRatio( up, down );
1061                if( ratio >= seedRatio ) {
1062                    tr_torrentStop( tor );
1063                   
1064                    /* set to no ratio limit to allow easy restarting */
1065                    tr_torrentSetRatioMode( tor, TR_RATIOLIMIT_UNLIMITED );
1066                   
1067                    fireRatioLimitHit( tor );
1068                }
1069            }
1070
1071            break;
1072        }
1073
1074        case TR_PEER_CLIENT_GOT_SUGGEST:
1075            if( peer )
1076                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1077            break;
1078
1079        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1080            if( peer )
1081                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1082            break;
1083
1084        case TR_PEER_CLIENT_GOT_DATA:
1085        {
1086            const time_t now = time( NULL );
1087            tr_torrent * tor = t->tor;
1088            tor->activityDate = now;
1089
1090            /* only add this to downloadedCur if we got it from a peer --
1091             * webseeds shouldn't count against our ratio.  As one tracker
1092             * admin put it, "Those pieces are downloaded directly from the
1093             * content distributor, not the peers, it is the tracker's job
1094             * to manage the swarms, not the web server and does not fit
1095             * into the jurisdiction of the tracker." */
1096            if( peer && e->wasPieceData )
1097                tor->downloadedCur += e->length;
1098
1099            /* update the stats */ 
1100            if( e->wasPieceData )
1101                tr_statsAddDownloaded( tor->session, e->length );
1102
1103            /* update our atom */
1104            if( peer ) {
1105                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1106                if( e->wasPieceData )
1107                    a->piece_data_time = now;
1108            }
1109
1110            break;
1111        }
1112
1113        case TR_PEER_PEER_PROGRESS:
1114        {
1115            if( peer )
1116            {
1117                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1118                const int peerIsSeed = e->progress >= 1.0;
1119                if( peerIsSeed ) {
1120                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1121                    atom->flags |= ADDED_F_SEED_FLAG;
1122                } else {
1123                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1124                    atom->flags &= ~ADDED_F_SEED_FLAG;
1125                }
1126            }
1127            break;
1128        }
1129
1130        case TR_PEER_CLIENT_GOT_BLOCK:
1131        {
1132            tr_torrent * tor = t->tor;
1133
1134            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1135
1136            tr_cpBlockAdd( &tor->completion, block );
1137            decrementPieceRequests( t, e->pieceIndex );
1138
1139            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1140
1141            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1142            {
1143                const tr_piece_index_t p = e->pieceIndex;
1144                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1145
1146                if( !ok )
1147                {
1148                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1149                               (unsigned long)p );
1150                }
1151
1152                tr_torrentSetHasPiece( tor, p, ok );
1153                tr_torrentSetPieceChecked( tor, p, TRUE );
1154                tr_peerMgrSetBlame( tor, p, ok );
1155
1156                if( !ok )
1157                {
1158                    gotBadPiece( t, p );
1159                }
1160                else
1161                {
1162                    int i;
1163                    int peerCount;
1164                    tr_peer ** peers;
1165                    tr_file_index_t fileIndex;
1166
1167                    peerCount = tr_ptrArraySize( &t->peers );
1168                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1169                    for( i=0; i<peerCount; ++i )
1170                        tr_peerMsgsHave( peers[i]->msgs, p );
1171
1172                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1173                    {
1174                        const tr_file * file = &tor->info.files[fileIndex];
1175                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1176                        {
1177                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1178                            tordbg( t, "closing recently-completed file \"%s\"", path );
1179                            tr_fdFileClose( path );
1180                            tr_free( path );
1181                        }
1182                    }
1183                }
1184
1185                tr_torrentRecheckCompleteness( tor );
1186            }
1187            break;
1188        }
1189
1190        case TR_PEER_ERROR:
1191            if( e->err == EINVAL )
1192            {
1193                addStrike( t, peer );
1194                peer->doPurge = 1;
1195                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1196            }
1197            else if( ( e->err == ERANGE )
1198                  || ( e->err == EMSGSIZE )
1199                  || ( e->err == ENOTCONN ) )
1200            {
1201                /* some protocol error from the peer */
1202                peer->doPurge = 1;
1203                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1204            }
1205            else /* a local error, such as an IO error */
1206            {
1207                t->tor->error = e->err;
1208                tr_strlcpy( t->tor->errorString,
1209                            tr_strerror( t->tor->error ),
1210                            sizeof( t->tor->errorString ) );
1211                tr_torrentStop( t->tor );
1212            }
1213            break;
1214
1215        default:
1216            assert( 0 );
1217    }
1218
1219    torrentUnlock( t );
1220}
1221
1222static void
1223ensureAtomExists( Torrent          * t,
1224                  const tr_address * addr,
1225                  tr_port            port,
1226                  uint8_t            flags,
1227                  uint8_t            from )
1228{
1229    if( getExistingAtom( t, addr ) == NULL )
1230    {
1231        struct peer_atom * a;
1232        a = tr_new0( struct peer_atom, 1 );
1233        a->addr = *addr;
1234        a->port = port;
1235        a->flags = flags;
1236        a->from = from;
1237        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1238        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1239    }
1240}
1241
1242static int
1243getMaxPeerCount( const tr_torrent * tor )
1244{
1245    return tor->maxConnectedPeers;
1246}
1247
1248static int
1249getPeerCount( const Torrent * t )
1250{
1251    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1252}
1253
1254/* FIXME: this is kind of a mess. */
1255static tr_bool
1256myHandshakeDoneCB( tr_handshake  * handshake,
1257                   tr_peerIo     * io,
1258                   tr_bool         isConnected,
1259                   const uint8_t * peer_id,
1260                   void          * vmanager )
1261{
1262    tr_bool            ok = isConnected;
1263    tr_bool            success = FALSE;
1264    tr_port            port;
1265    const tr_address * addr;
1266    tr_peerMgr       * manager = vmanager;
1267    Torrent          * t;
1268    tr_handshake     * ours;
1269
1270    assert( io );
1271    assert( tr_isBool( ok ) );
1272
1273    t = tr_peerIoHasTorrentHash( io )
1274        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1275        : NULL;
1276
1277    if( tr_peerIoIsIncoming ( io ) )
1278        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1279                                        handshake, handshakeCompare );
1280    else if( t )
1281        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1282                                        handshake, handshakeCompare );
1283    else
1284        ours = handshake;
1285
1286    assert( ours );
1287    assert( ours == handshake );
1288
1289    if( t )
1290        torrentLock( t );
1291
1292    addr = tr_peerIoGetAddress( io, &port );
1293
1294    if( !ok || !t || !t->isRunning )
1295    {
1296        if( t )
1297        {
1298            struct peer_atom * atom = getExistingAtom( t, addr );
1299            if( atom )
1300                ++atom->numFails;
1301        }
1302    }
1303    else /* looking good */
1304    {
1305        struct peer_atom * atom;
1306        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1307        atom = getExistingAtom( t, addr );
1308        atom->time = time( NULL );
1309        atom->piece_data_time = 0;
1310
1311        if( atom->myflags & MYFLAG_BANNED )
1312        {
1313            tordbg( t, "banned peer %s tried to reconnect",
1314                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1315        }
1316        else if( tr_peerIoIsIncoming( io )
1317               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1318
1319        {
1320        }
1321        else
1322        {
1323            tr_peer * peer = getExistingPeer( t, addr );
1324
1325            if( peer ) /* we already have this peer */
1326            {
1327            }
1328            else
1329            {
1330                peer = getPeer( t, addr );
1331                tr_free( peer->client );
1332
1333                if( !peer_id )
1334                    peer->client = NULL;
1335                else {
1336                    char client[128];
1337                    tr_clientForId( client, sizeof( client ), peer_id );
1338                    peer->client = tr_strdup( client );
1339                }
1340
1341                peer->port = port;
1342                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1343                                                                balanced by our unref in peerDestructor()  */
1344                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1345                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1346
1347                success = TRUE;
1348            }
1349        }
1350    }
1351
1352    if( t )
1353        torrentUnlock( t );
1354
1355    return success;
1356}
1357
1358void
1359tr_peerMgrAddIncoming( tr_peerMgr * manager,
1360                       tr_address * addr,
1361                       tr_port      port,
1362                       int          socket )
1363{
1364    managerLock( manager );
1365
1366    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1367    {
1368        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1369        tr_netClose( socket );
1370    }
1371    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1372    {
1373        tr_netClose( socket );
1374    }
1375    else /* we don't have a connetion to them yet... */
1376    {
1377        tr_peerIo *    io;
1378        tr_handshake * handshake;
1379
1380        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1381
1382        handshake = tr_handshakeNew( io,
1383                                     manager->session->encryptionMode,
1384                                     myHandshakeDoneCB,
1385                                     manager );
1386
1387        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1388
1389        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1390                                 handshakeCompare );
1391    }
1392
1393    managerUnlock( manager );
1394}
1395
1396static tr_bool
1397tr_isPex( const tr_pex * pex )
1398{
1399    return pex && tr_isAddress( &pex->addr );
1400}
1401
1402void
1403tr_peerMgrAddPex( tr_torrent   *  tor,
1404                  uint8_t         from,
1405                  const tr_pex *  pex )
1406{
1407    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1408    {
1409        Torrent * t = tor->torrentPeers;
1410        managerLock( t->manager );
1411
1412        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1413            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1414                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1415
1416        managerUnlock( t->manager );
1417    }
1418}
1419
1420tr_pex *
1421tr_peerMgrCompactToPex( const void *    compact,
1422                        size_t          compactLen,
1423                        const uint8_t * added_f,
1424                        size_t          added_f_len,
1425                        size_t *        pexCount )
1426{
1427    size_t          i;
1428    size_t          n = compactLen / 6;
1429    const uint8_t * walk = compact;
1430    tr_pex *        pex = tr_new0( tr_pex, n );
1431
1432    for( i = 0; i < n; ++i )
1433    {
1434        pex[i].addr.type = TR_AF_INET;
1435        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1436        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1437        if( added_f && ( n == added_f_len ) )
1438            pex[i].flags = added_f[i];
1439    }
1440
1441    *pexCount = n;
1442    return pex;
1443}
1444
1445tr_pex *
1446tr_peerMgrCompact6ToPex( const void    * compact,
1447                         size_t          compactLen,
1448                         const uint8_t * added_f,
1449                         size_t          added_f_len,
1450                         size_t        * pexCount )
1451{
1452    size_t          i;
1453    size_t          n = compactLen / 18;
1454    const uint8_t * walk = compact;
1455    tr_pex *        pex = tr_new0( tr_pex, n );
1456   
1457    for( i = 0; i < n; ++i )
1458    {
1459        pex[i].addr.type = TR_AF_INET6;
1460        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1461        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1462        if( added_f && ( n == added_f_len ) )
1463            pex[i].flags = added_f[i];
1464    }
1465   
1466    *pexCount = n;
1467    return pex;
1468}
1469
1470tr_pex *
1471tr_peerMgrArrayToPex( const void * array,
1472                      size_t       arrayLen,
1473                      size_t      * pexCount )
1474{
1475    size_t          i;
1476    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1477    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1478    const uint8_t * walk = array;
1479    tr_pex        * pex = tr_new0( tr_pex, n );
1480   
1481    for( i = 0 ; i < n ; i++ ) {
1482        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1483        tr_suspectAddress( &pex[i].addr, "tracker"  );
1484        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1485        pex[i].flags = 0x00;
1486        walk += sizeof( tr_address ) + 2;
1487    }
1488   
1489    *pexCount = n;
1490    return pex;
1491}
1492
1493/**
1494***
1495**/
1496
1497void
1498tr_peerMgrSetBlame( tr_torrent     * tor,
1499                    tr_piece_index_t pieceIndex,
1500                    int              success )
1501{
1502    if( !success )
1503    {
1504        int        peerCount, i;
1505        Torrent *  t = tor->torrentPeers;
1506        tr_peer ** peers;
1507
1508        assert( torrentIsLocked( t ) );
1509
1510        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1511        for( i = 0; i < peerCount; ++i )
1512        {
1513            tr_peer * peer = peers[i];
1514            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1515            {
1516                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1517                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1518                        pieceIndex, (int)peer->strikes + 1 );
1519                addStrike( t, peer );
1520            }
1521        }
1522    }
1523}
1524
1525int
1526tr_pexCompare( const void * va, const void * vb )
1527{
1528    const tr_pex * a = va;
1529    const tr_pex * b = vb;
1530    int i;
1531
1532    assert( tr_isPex( a ) );
1533    assert( tr_isPex( b ) );
1534
1535    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1536        return i;
1537
1538    if( a->port != b->port )
1539        return a->port < b->port ? -1 : 1;
1540
1541    return 0;
1542}
1543
1544static int
1545peerPrefersCrypto( const tr_peer * peer )
1546{
1547    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1548        return TRUE;
1549
1550    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1551        return FALSE;
1552
1553    return tr_peerIoIsEncrypted( peer->io );
1554}
1555
1556int
1557tr_peerMgrGetPeers( tr_torrent      * tor,
1558                    tr_pex         ** setme_pex,
1559                    uint8_t           af)
1560{
1561    int peersReturning = 0;
1562    const Torrent * t = tor->torrentPeers;
1563
1564    managerLock( t->manager );
1565
1566    {
1567        int i;
1568        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1569        const int peerCount = tr_ptrArraySize( &t->peers );
1570        /* for now, this will waste memory on torrents that have both
1571         * ipv6 and ipv4 peers */
1572        tr_pex * pex = tr_new( tr_pex, peerCount );
1573        tr_pex * walk = pex;
1574
1575        for( i=0; i<peerCount; ++i )
1576        {
1577            const tr_peer * peer = peers[i];
1578            if( peer->addr.type == af )
1579            {
1580                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1581
1582                assert( tr_isAddress( &peer->addr ) );
1583                walk->addr = peer->addr;
1584                walk->port = peer->port;
1585                walk->flags = 0;
1586                if( peerPrefersCrypto( peer ) )
1587                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1588                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1589                    walk->flags |= ADDED_F_SEED_FLAG;
1590                ++peersReturning;
1591                ++walk;
1592            }
1593        }
1594
1595        assert( ( walk - pex ) == peersReturning );
1596        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1597        *setme_pex = pex;
1598    }
1599
1600    managerUnlock( t->manager );
1601    return peersReturning;
1602}
1603
1604void
1605tr_peerMgrStartTorrent( tr_torrent * tor )
1606{
1607    Torrent * t = tor->torrentPeers;
1608
1609    managerLock( t->manager );
1610
1611    assert( t );
1612
1613    if( !t->isRunning )
1614    {
1615        t->isRunning = TRUE;
1616
1617        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1618            refillSoon( t );
1619    }
1620
1621    managerUnlock( t->manager );
1622}
1623
1624static void
1625stopTorrent( Torrent * t )
1626{
1627    assert( torrentIsLocked( t ) );
1628
1629    t->isRunning = FALSE;
1630
1631    /* disconnect the peers. */
1632    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1633    tr_ptrArrayClear( &t->peers );
1634
1635    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1636     * which removes the handshake from t->outgoingHandshakes... */
1637    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1638        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1639}
1640
1641void
1642tr_peerMgrStopTorrent( tr_torrent * tor )
1643{
1644    Torrent * t = tor->torrentPeers;
1645
1646    managerLock( t->manager );
1647
1648    stopTorrent( t );
1649
1650    managerUnlock( t->manager );
1651}
1652
1653void
1654tr_peerMgrAddTorrent( tr_peerMgr * manager,
1655                      tr_torrent * tor )
1656{
1657    managerLock( manager );
1658
1659    assert( tor );
1660    assert( tor->torrentPeers == NULL );
1661
1662    tor->torrentPeers = torrentConstructor( manager, tor );
1663
1664    managerUnlock( manager );
1665}
1666
1667void
1668tr_peerMgrRemoveTorrent( tr_torrent * tor )
1669{
1670    tr_torrentLock( tor );
1671
1672    stopTorrent( tor->torrentPeers );
1673    torrentDestructor( tor->torrentPeers );
1674
1675    tr_torrentUnlock( tor );
1676}
1677
1678void
1679tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1680                               int8_t           * tab,
1681                               unsigned int       tabCount )
1682{
1683    tr_piece_index_t   i;
1684    const Torrent *    t;
1685    float              interval;
1686    tr_bool            isSeed;
1687    int                peerCount;
1688    const tr_peer **   peers;
1689    tr_torrentLock( tor );
1690
1691    t = tor->torrentPeers;
1692    tor = t->tor;
1693    interval = tor->info.pieceCount / (float)tabCount;
1694    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1695    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1696    peerCount = tr_ptrArraySize( &t->peers );
1697
1698    memset( tab, 0, tabCount );
1699
1700    for( i = 0; tor && i < tabCount; ++i )
1701    {
1702        const int piece = i * interval;
1703
1704        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1705            tab[i] = -1;
1706        else if( peerCount ) {
1707            int j;
1708            for( j = 0; j < peerCount; ++j )
1709                if( tr_bitfieldHas( peers[j]->have, i ) )
1710                    ++tab[i];
1711        }
1712    }
1713
1714    tr_torrentUnlock( tor );
1715}
1716
1717/* Returns the pieces that are available from peers */
1718tr_bitfield*
1719tr_peerMgrGetAvailable( const tr_torrent * tor )
1720{
1721    int i;
1722    int peerCount;
1723    Torrent * t = tor->torrentPeers;
1724    const tr_peer ** peers;
1725    tr_bitfield * pieces;
1726    managerLock( t->manager );
1727
1728    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1729    peerCount = tr_ptrArraySize( &t->peers );
1730    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1731    for( i=0; i<peerCount; ++i )
1732        tr_bitfieldOr( pieces, peers[i]->have );
1733
1734    managerUnlock( t->manager );
1735    return pieces;
1736}
1737
1738void
1739tr_peerMgrTorrentStats( tr_torrent       * tor,
1740                        int              * setmePeersKnown,
1741                        int              * setmePeersConnected,
1742                        int              * setmeSeedsConnected,
1743                        int              * setmeWebseedsSendingToUs,
1744                        int              * setmePeersSendingToUs,
1745                        int              * setmePeersGettingFromUs,
1746                        int              * setmePeersFrom )
1747{
1748    int i, size;
1749    const Torrent * t = tor->torrentPeers;
1750    const tr_peer ** peers;
1751    const tr_webseed ** webseeds;
1752
1753    managerLock( t->manager );
1754
1755    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1756    size = tr_ptrArraySize( &t->peers );
1757
1758    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1759    *setmePeersConnected       = 0;
1760    *setmeSeedsConnected       = 0;
1761    *setmePeersGettingFromUs   = 0;
1762    *setmePeersSendingToUs     = 0;
1763    *setmeWebseedsSendingToUs  = 0;
1764
1765    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1766        setmePeersFrom[i] = 0;
1767
1768    for( i=0; i<size; ++i )
1769    {
1770        const tr_peer * peer = peers[i];
1771        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1772
1773        if( peer->io == NULL ) /* not connected */
1774            continue;
1775
1776        ++*setmePeersConnected;
1777
1778        ++setmePeersFrom[atom->from];
1779
1780        if( clientIsDownloadingFrom( peer ) )
1781            ++*setmePeersSendingToUs;
1782
1783        if( clientIsUploadingTo( peer ) )
1784            ++*setmePeersGettingFromUs;
1785
1786        if( atom->flags & ADDED_F_SEED_FLAG )
1787            ++*setmeSeedsConnected;
1788    }
1789
1790    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1791    size = tr_ptrArraySize( &t->webseeds );
1792    for( i=0; i<size; ++i )
1793        if( tr_webseedIsActive( webseeds[i] ) )
1794            ++*setmeWebseedsSendingToUs;
1795
1796    managerUnlock( t->manager );
1797}
1798
1799float*
1800tr_peerMgrWebSpeeds( const tr_torrent * tor )
1801{
1802    const Torrent * t = tor->torrentPeers;
1803    const tr_webseed ** webseeds;
1804    int i;
1805    int webseedCount;
1806    float * ret;
1807    uint64_t now;
1808
1809    assert( t->manager );
1810    managerLock( t->manager );
1811
1812    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1813    webseedCount = tr_ptrArraySize( &t->webseeds );
1814    assert( webseedCount == tor->info.webseedCount );
1815    ret = tr_new0( float, webseedCount );
1816    now = tr_date( );
1817
1818    for( i=0; i<webseedCount; ++i )
1819        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1820            ret[i] = -1.0;
1821
1822    managerUnlock( t->manager );
1823    return ret;
1824}
1825
1826double
1827tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1828{
1829    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1830}
1831
1832
1833struct tr_peer_stat *
1834tr_peerMgrPeerStats( const tr_torrent    * tor,
1835                     int                 * setmeCount )
1836{
1837    int i, size;
1838    const Torrent * t = tor->torrentPeers;
1839    const tr_peer ** peers;
1840    tr_peer_stat * ret;
1841    uint64_t now;
1842
1843    assert( t->manager );
1844    managerLock( t->manager );
1845
1846    size = tr_ptrArraySize( &t->peers );
1847    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1848    ret = tr_new0( tr_peer_stat, size );
1849    now = tr_date( );
1850
1851    for( i = 0; i < size; ++i )
1852    {
1853        char *                   pch;
1854        const tr_peer *          peer = peers[i];
1855        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1856        tr_peer_stat *           stat = ret + i;
1857        tr_address               norm_addr;
1858
1859        norm_addr = peer->addr;
1860        tr_normalizeV4Mapped( &norm_addr );
1861        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1862        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1863                   sizeof( stat->client ) );
1864        stat->port               = ntohs( peer->port );
1865        stat->from               = atom->from;
1866        stat->progress           = peer->progress;
1867        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1868        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1869        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1870        stat->peerIsChoked       = peer->peerIsChoked;
1871        stat->peerIsInterested   = peer->peerIsInterested;
1872        stat->clientIsChoked     = peer->clientIsChoked;
1873        stat->clientIsInterested = peer->clientIsInterested;
1874        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1875        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1876        stat->isUploadingTo      = clientIsUploadingTo( peer );
1877        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1878
1879        pch = stat->flagStr;
1880        if( t->optimistic == peer ) *pch++ = 'O';
1881        if( stat->isDownloadingFrom ) *pch++ = 'D';
1882        else if( stat->clientIsInterested ) *pch++ = 'd';
1883        if( stat->isUploadingTo ) *pch++ = 'U';
1884        else if( stat->peerIsInterested ) *pch++ = 'u';
1885        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1886        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1887        if( stat->isEncrypted ) *pch++ = 'E';
1888        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1889        if( stat->isIncoming ) *pch++ = 'I';
1890        *pch = '\0';
1891    }
1892
1893    *setmeCount = size;
1894
1895    managerUnlock( t->manager );
1896    return ret;
1897}
1898
1899/**
1900***
1901**/
1902
1903struct ChokeData
1904{
1905    tr_bool         doUnchoke;
1906    tr_bool         isInterested;
1907    tr_bool         isChoked;
1908    int             rate;
1909    tr_peer *       peer;
1910};
1911
1912static int
1913compareChoke( const void * va,
1914              const void * vb )
1915{
1916    const struct ChokeData * a = va;
1917    const struct ChokeData * b = vb;
1918
1919    if( a->rate != b->rate ) /* prefer higher overall speeds */
1920        return a->rate > b->rate ? -1 : 1;
1921
1922    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1923        return a->isChoked ? 1 : -1;
1924
1925    return 0;
1926}
1927
1928static int
1929isNew( const tr_peer * peer )
1930{
1931    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1932}
1933
1934static int
1935isSame( const tr_peer * peer )
1936{
1937    return peer && peer->client && strstr( peer->client, "Transmission" );
1938}
1939
1940/**
1941***
1942**/
1943
1944static void
1945rechokeTorrent( Torrent * t )
1946{
1947    int i, size, unchokedInterested;
1948    const int peerCount = tr_ptrArraySize( &t->peers );
1949    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1950    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1951    const tr_session * session = t->manager->session;
1952    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1953    const uint64_t now = tr_date( );
1954
1955    assert( torrentIsLocked( t ) );
1956
1957    /* sort the peers by preference and rate */
1958    for( i = 0, size = 0; i < peerCount; ++i )
1959    {
1960        tr_peer * peer = peers[i];
1961        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1962
1963        if( peer->progress >= 1.0 ) /* choke all seeds */
1964        {
1965            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1966        }
1967        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1968        {
1969            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1970        }
1971        else if( chokeAll ) /* choke everyone if we're not uploading */
1972        {
1973            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1974        }
1975        else
1976        {
1977            struct ChokeData * n = &choke[size++];
1978            n->peer         = peer;
1979            n->isInterested = peer->peerIsInterested;
1980            n->isChoked     = peer->peerIsChoked;
1981            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1982        }
1983    }
1984
1985    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1986
1987    /**
1988     * Reciprocation and number of uploads capping is managed by unchoking
1989     * the N peers which have the best upload rate and are interested.
1990     * This maximizes the client's download rate. These N peers are
1991     * referred to as downloaders, because they are interested in downloading
1992     * from the client.
1993     *
1994     * Peers which have a better upload rate (as compared to the downloaders)
1995     * but aren't interested get unchoked. If they become interested, the
1996     * downloader with the worst upload rate gets choked. If a client has
1997     * a complete file, it uses its upload rate rather than its download
1998     * rate to decide which peers to unchoke.
1999     */
2000    unchokedInterested = 0;
2001    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2002        choke[i].doUnchoke = 1;
2003        if( choke[i].isInterested )
2004            ++unchokedInterested;
2005    }
2006
2007    /* optimistic unchoke */
2008    if( i < size )
2009    {
2010        int n;
2011        struct ChokeData * c;
2012        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2013
2014        for( ; i<size; ++i )
2015        {
2016            if( choke[i].isInterested )
2017            {
2018                const tr_peer * peer = choke[i].peer;
2019                int x = 1, y;
2020                if( isNew( peer ) ) x *= 3;
2021                if( isSame( peer ) ) x *= 3;
2022                for( y=0; y<x; ++y )
2023                    tr_ptrArrayAppend( &randPool, &choke[i] );
2024            }
2025        }
2026
2027        if(( n = tr_ptrArraySize( &randPool )))
2028        {
2029            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2030            c->doUnchoke = 1;
2031            t->optimistic = c->peer;
2032        }
2033
2034        tr_ptrArrayDestruct( &randPool, NULL );
2035    }
2036
2037    for( i=0; i<size; ++i )
2038        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2039
2040    /* cleanup */
2041    tr_free( choke );
2042}
2043
2044static int
2045rechokePulse( void * vmgr )
2046{
2047    tr_torrent * tor = NULL;
2048    tr_peerMgr * mgr = vmgr;
2049    managerLock( mgr );
2050
2051    while(( tor = tr_torrentNext( mgr->session, tor )))
2052        if( tor->isRunning )
2053            rechokeTorrent( tor->torrentPeers );
2054
2055    managerUnlock( mgr );
2056    return TRUE;
2057}
2058
2059/***
2060****
2061****  Life and Death
2062****
2063***/
2064
2065typedef enum
2066{
2067    TR_CAN_KEEP,
2068    TR_CAN_CLOSE,
2069    TR_MUST_CLOSE,
2070}
2071tr_close_type_t;
2072
2073static tr_close_type_t
2074shouldPeerBeClosed( const Torrent    * t,
2075                    const tr_peer    * peer,
2076                    int                peerCount )
2077{
2078    const tr_torrent *       tor = t->tor;
2079    const time_t             now = time( NULL );
2080    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2081
2082    /* if it's marked for purging, close it */
2083    if( peer->doPurge )
2084    {
2085        tordbg( t, "purging peer %s because its doPurge flag is set",
2086                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2087        return TR_MUST_CLOSE;
2088    }
2089
2090    /* if we're seeding and the peer has everything we have,
2091     * and enough time has passed for a pex exchange, then disconnect */
2092    if( tr_torrentIsSeed( tor ) )
2093    {
2094        int peerHasEverything;
2095        if( atom->flags & ADDED_F_SEED_FLAG )
2096            peerHasEverything = TRUE;
2097        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2098            peerHasEverything = FALSE;
2099        else {
2100            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2101            tr_bitfieldDifference( tmp, peer->have );
2102            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2103            tr_bitfieldFree( tmp );
2104        }
2105
2106        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2107        {
2108            tordbg( t, "purging peer %s because we're both seeds",
2109                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2110            return TR_MUST_CLOSE;
2111        }
2112    }
2113
2114    /* disconnect if it's been too long since piece data has been transferred.
2115     * this is on a sliding scale based on number of available peers... */
2116    {
2117        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2118        /* if we have >= relaxIfFewerThan, strictness is 100%.
2119         * if we have zero connections, strictness is 0% */
2120        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2121                               ? 1.0
2122                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2123        const int lo = MIN_UPLOAD_IDLE_SECS;
2124        const int hi = MAX_UPLOAD_IDLE_SECS;
2125        const int limit = hi - ( ( hi - lo ) * strictness );
2126        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2127/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2128        if( idleTime > limit ) {
2129            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2130                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2131            return TR_CAN_CLOSE;
2132        }
2133    }
2134
2135    return TR_CAN_KEEP;
2136}
2137
2138static tr_peer **
2139getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2140{
2141    int i, peerCount, outsize;
2142    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2143    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2144
2145    assert( torrentIsLocked( t ) );
2146
2147    for( i = outsize = 0; i < peerCount; ++i )
2148        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2149            ret[outsize++] = peers[i];
2150
2151    *setmeSize = outsize;
2152    return ret;
2153}
2154
2155static int
2156compareCandidates( const void * va,
2157                   const void * vb )
2158{
2159    const struct peer_atom * a = *(const struct peer_atom**) va;
2160    const struct peer_atom * b = *(const struct peer_atom**) vb;
2161
2162    /* <Charles> Here we would probably want to try reconnecting to
2163     * peers that had most recently given us data. Lots of users have
2164     * trouble with resets due to their routers and/or ISPs. This way we
2165     * can quickly recover from an unwanted reset. So we sort
2166     * piece_data_time in descending order.
2167     */
2168
2169    if( a->piece_data_time != b->piece_data_time )
2170        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2171
2172    if( a->numFails != b->numFails )
2173        return a->numFails < b->numFails ? -1 : 1;
2174
2175    if( a->time != b->time )
2176        return a->time < b->time ? -1 : 1;
2177
2178    /* all other things being equal, prefer peers whose
2179     * information comes from a more reliable source */
2180    if( a->from != b->from )
2181        return a->from < b->from ? -1 : 1;
2182
2183    return 0;
2184}
2185
2186static int
2187getReconnectIntervalSecs( const struct peer_atom * atom )
2188{
2189    int          sec;
2190    const time_t now = time( NULL );
2191
2192    /* if we were recently connected to this peer and transferring piece
2193     * data, try to reconnect to them sooner rather that later -- we don't
2194     * want network troubles to get in the way of a good peer. */
2195    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2196        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2197
2198    /* don't allow reconnects more often than our minimum */
2199    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2200        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2201
2202    /* otherwise, the interval depends on how many times we've tried
2203     * and failed to connect to the peer */
2204    else switch( atom->numFails ) {
2205        case 0: sec = 0; break;
2206        case 1: sec = 5; break;
2207        case 2: sec = 2 * 60; break;
2208        case 3: sec = 15 * 60; break;
2209        case 4: sec = 30 * 60; break;
2210        case 5: sec = 60 * 60; break;
2211        default: sec = 120 * 60; break;
2212    }
2213
2214    return sec;
2215}
2216
2217static struct peer_atom **
2218getPeerCandidates( Torrent * t, int * setmeSize )
2219{
2220    int                 i, atomCount, retCount;
2221    struct peer_atom ** atoms;
2222    struct peer_atom ** ret;
2223    const time_t        now = time( NULL );
2224    const int           seed = tr_torrentIsSeed( t->tor );
2225
2226    assert( torrentIsLocked( t ) );
2227
2228    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2229    ret = tr_new( struct peer_atom*, atomCount );
2230    for( i = retCount = 0; i < atomCount; ++i )
2231    {
2232        int                interval;
2233        struct peer_atom * atom = atoms[i];
2234
2235        /* peer fed us too much bad data ... we only keep it around
2236         * now to weed it out in case someone sends it to us via pex */
2237        if( atom->myflags & MYFLAG_BANNED )
2238            continue;
2239
2240        /* peer was unconnectable before, so we're not going to keep trying.
2241         * this is needs a separate flag from `banned', since if they try
2242         * to connect to us later, we'll let them in */
2243        if( atom->myflags & MYFLAG_UNREACHABLE )
2244            continue;
2245
2246        /* we don't need two connections to the same peer... */
2247        if( peerIsInUse( t, &atom->addr ) )
2248            continue;
2249
2250        /* no need to connect if we're both seeds... */
2251        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2252                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2253            continue;
2254
2255        /* don't reconnect too often */
2256        interval = getReconnectIntervalSecs( atom );
2257        if( ( now - atom->time ) < interval )
2258        {
2259            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2260                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2261            continue;
2262        }
2263
2264        /* Don't connect to peers in our blocklist */
2265        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2266            continue;
2267
2268        ret[retCount++] = atom;
2269    }
2270
2271    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2272    *setmeSize = retCount;
2273    return ret;
2274}
2275
2276static void
2277closePeer( Torrent * t, tr_peer * peer )
2278{
2279    struct peer_atom * atom;
2280
2281    assert( t != NULL );
2282    assert( peer != NULL );
2283
2284    /* if we transferred piece data, then they might be good peers,
2285       so reset their `numFails' weight to zero.  otherwise we connected
2286       to them fruitlessly, so mark it as another fail */
2287    atom = getExistingAtom( t, &peer->addr );
2288    if( atom->piece_data_time )
2289        atom->numFails = 0;
2290    else
2291        ++atom->numFails;
2292
2293    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2294    removePeer( t, peer );
2295}
2296
2297static void
2298reconnectTorrent( Torrent * t )
2299{
2300    static time_t prevTime = 0;
2301    static int    newConnectionsThisSecond = 0;
2302    time_t        now;
2303
2304    now = time( NULL );
2305    if( prevTime != now )
2306    {
2307        prevTime = now;
2308        newConnectionsThisSecond = 0;
2309    }
2310
2311    if( !t->isRunning )
2312    {
2313        removeAllPeers( t );
2314    }
2315    else
2316    {
2317        int i;
2318        int canCloseCount;
2319        int mustCloseCount;
2320        int candidateCount;
2321        int maxCandidates;
2322        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2323        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2324        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2325
2326        tordbg( t, "reconnect pulse for [%s]: "
2327                   "%d must-close connections, "
2328                   "%d can-close connections, "
2329                   "%d connection candidates, "
2330                   "%d atoms, "
2331                   "max per pulse is %d",
2332                   t->tor->info.name,
2333                   mustCloseCount,
2334                   canCloseCount,
2335                   candidateCount,
2336                   tr_ptrArraySize( &t->pool ),
2337                   MAX_RECONNECTIONS_PER_PULSE );
2338
2339        /* disconnect the really bad peers */
2340        for( i=0; i<mustCloseCount; ++i )
2341            closePeer( t, mustClose[i] );
2342
2343        /* decide how many peers can we try to add in this pass */
2344        maxCandidates = candidateCount;
2345        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2346        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2347        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2348
2349        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2350        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2351            closePeer( t, canClose[i] );
2352
2353        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2354                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2355                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2356                   candidateCount,
2357                   MAX_RECONNECTIONS_PER_PULSE,
2358                   getPeerCount( t ),
2359                   getMaxPeerCount( t->tor ),
2360                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2361
2362        /* add some new ones */
2363        for( i=0; i<maxCandidates; ++i )
2364        {
2365            tr_peerMgr        * mgr = t->manager;
2366            struct peer_atom  * atom = candidates[i];
2367            tr_peerIo         * io;
2368
2369            tordbg( t, "Starting an OUTGOING connection with %s",
2370                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2371
2372            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2373
2374            if( io == NULL )
2375            {
2376                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2377                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2378                atom->myflags |= MYFLAG_UNREACHABLE;
2379            }
2380            else
2381            {
2382                tr_handshake * handshake = tr_handshakeNew( io,
2383                                                            mgr->session->encryptionMode,
2384                                                            myHandshakeDoneCB,
2385                                                            mgr );
2386
2387                assert( tr_peerIoGetTorrentHash( io ) );
2388
2389                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2390
2391                ++newConnectionsThisSecond;
2392
2393                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2394                                         handshakeCompare );
2395            }
2396
2397            atom->time = time( NULL );
2398        }
2399
2400        /* cleanup */
2401        tr_free( candidates );
2402        tr_free( mustClose );
2403        tr_free( canClose );
2404    }
2405}
2406
2407static int
2408reconnectPulse( void * vmgr )
2409{
2410    tr_torrent * tor = NULL;
2411    tr_peerMgr * mgr = vmgr;
2412    managerLock( mgr );
2413
2414    while(( tor = tr_torrentNext( mgr->session, tor )))
2415        if( tor->isRunning )
2416            reconnectTorrent( tor->torrentPeers );
2417
2418    managerUnlock( mgr );
2419    return TRUE;
2420}
2421
2422/****
2423*****
2424*****  BANDWIDTH ALLOCATION
2425*****
2426****/
2427
2428static void
2429pumpAllPeers( tr_peerMgr * mgr )
2430{
2431    tr_torrent * tor = NULL;
2432
2433    while(( tor = tr_torrentNext( mgr->session, tor )))
2434    {
2435        int j;
2436        Torrent * t = tor->torrentPeers;
2437
2438        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2439        {
2440            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2441            tr_peerMsgsPulse( peer->msgs );
2442        }
2443    }
2444}
2445
2446static int
2447bandwidthPulse( void * vmgr )
2448{
2449    tr_peerMgr * mgr = vmgr;
2450    managerLock( mgr );
2451
2452    /* FIXME: this next line probably isn't necessary... */
2453    pumpAllPeers( mgr );
2454
2455    /* allocate bandwidth to the peers */
2456    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2457    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2458
2459    managerUnlock( mgr );
2460    return TRUE;
2461}
Note: See TracBrowser for help on using the repository browser.