source: trunk/libtransmission/peer-mgr.c @ 7909

Last change on this file since 7909 was 7909, checked in by charles, 13 years ago

(trunk libT) experimental fix for #1829: High CPU use in refillPulse()

  • Property svn:keywords set to Date Rev Author Id
File size: 70.4 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7909 2009-02-18 17:19:36Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 400,
51   
52    /* how frequently to reallocate bandwidth */
53    BANDWIDTH_PERIOD_MSEC = 500,
54
55    /* how frequently to age out old piece request lists */
56    REFILL_UPKEEP_PERIOD_MSEC = 1000,
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = 500,
60
61    /* when many peers are available, keep idle ones this long */
62    MIN_UPLOAD_IDLE_SECS = ( 30 ),
63
64    /* when few peers are available, keep idle ones this long */
65    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
66
67    /* max # of peers to ask fer per torrent per reconnect pulse */
68    MAX_RECONNECTIONS_PER_PULSE = 16,
69
70    /* max number of peers to ask for per second overall.
71    * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 32,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* amount of time to keep a list of request pieces lying around
78       before it's considered too old and needs to be rebuilt */
79    PIECE_LIST_SHELF_LIFE_SECS = 60,
80
81    /* use for bitwise operations w/peer_atom.myflags */
82    MYFLAG_BANNED = 1,
83
84    /* use for bitwise operations w/peer_atom.myflags */
85    /* unreachable for now... but not banned.
86     * if they try to connect to us it's okay */
87    MYFLAG_UNREACHABLE = 2,
88
89    /* the minimum we'll wait before attempting to reconnect to a peer */
90    MINIMUM_RECONNECT_INTERVAL_SECS = 5
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125};
126
127struct tr_blockIterator
128{
129    time_t expirationDate;
130    struct tr_torrent_peers * t;
131    tr_block_index_t blockIndex, blockCount, *blocks;
132    tr_piece_index_t pieceIndex, pieceCount, *pieces;
133};
134
135typedef struct tr_torrent_peers
136{
137    tr_bool                    isRunning;
138
139    uint8_t                    hash[SHA_DIGEST_LENGTH];
140    int                      * pendingRequestCount;
141    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
142    tr_ptrArray                pool; /* struct peer_atom */
143    tr_ptrArray                peers; /* tr_peer */
144    tr_ptrArray                webseeds; /* tr_webseed */
145    tr_timer                 * refillTimer;
146    tr_torrent               * tor;
147    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
148    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
149
150    struct tr_peerMgr        * manager;
151}
152Torrent;
153
154struct tr_peerMgr
155{
156    tr_session      * session;
157    tr_ptrArray       incomingHandshakes; /* tr_handshake */
158    tr_timer        * bandwidthTimer;
159    tr_timer        * rechokeTimer;
160    tr_timer        * reconnectTimer;
161    tr_timer        * refillUpkeepTimer;
162};
163
164#define tordbg( t, ... ) \
165    do { \
166        if( tr_deepLoggingIsActive( ) ) \
167            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
168    } while( 0 )
169
170#define dbgmsg( ... ) \
171    do { \
172        if( tr_deepLoggingIsActive( ) ) \
173            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
174    } while( 0 )
175
176/**
177***
178**/
179
180static TR_INLINE void
181managerLock( const struct tr_peerMgr * manager )
182{
183    tr_globalLock( manager->session );
184}
185
186static TR_INLINE void
187managerUnlock( const struct tr_peerMgr * manager )
188{
189    tr_globalUnlock( manager->session );
190}
191
192static TR_INLINE void
193torrentLock( Torrent * torrent )
194{
195    managerLock( torrent->manager );
196}
197
198static TR_INLINE void
199torrentUnlock( Torrent * torrent )
200{
201    managerUnlock( torrent->manager );
202}
203
204static TR_INLINE int
205torrentIsLocked( const Torrent * t )
206{
207    return tr_globalIsLocked( t->manager->session );
208}
209
210/**
211***
212**/
213
214static int
215handshakeCompareToAddr( const void * va, const void * vb )
216{
217    const tr_handshake * a = va;
218
219    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
220}
221
222static int
223handshakeCompare( const void * a, const void * b )
224{
225    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
226}
227
228static tr_handshake*
229getExistingHandshake( tr_ptrArray      * handshakes,
230                      const tr_address * addr )
231{
232    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
233}
234
235static int
236comparePeerAtomToAddress( const void * va, const void * vb )
237{
238    const struct peer_atom * a = va;
239
240    return tr_compareAddresses( &a->addr, vb );
241}
242
243static int
244comparePeerAtoms( const void * va, const void * vb )
245{
246    const struct peer_atom * b = vb;
247
248    return comparePeerAtomToAddress( va, &b->addr );
249}
250
251/**
252***
253**/
254
255static Torrent*
256getExistingTorrent( tr_peerMgr *    manager,
257                    const uint8_t * hash )
258{
259    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
260
261    return tor == NULL ? NULL : tor->torrentPeers;
262}
263
264static int
265peerCompare( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    const tr_peer * b = vb;
269
270    return tr_compareAddresses( &a->addr, &b->addr );
271}
272
273static int
274peerCompareToAddr( const void * va, const void * vb )
275{
276    const tr_peer * a = va;
277
278    return tr_compareAddresses( &a->addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent          * torrent,
283                 const tr_address * addr )
284{
285    assert( torrentIsLocked( torrent ) );
286    assert( addr );
287
288    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
289}
290
291static struct peer_atom*
292getExistingAtom( const Torrent    * t,
293                 const tr_address * addr )
294{
295    Torrent * tt = (Torrent*)t;
296    assert( torrentIsLocked( t ) );
297    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
298}
299
300static tr_bool
301peerIsInUse( const Torrent    * ct,
302             const tr_address * addr )
303{
304    Torrent * t = (Torrent*) ct;
305
306    assert( torrentIsLocked ( t ) );
307
308    return getExistingPeer( t, addr )
309        || getExistingHandshake( &t->outgoingHandshakes, addr )
310        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
311}
312
313static tr_peer*
314peerConstructor( const tr_address * addr )
315{
316    tr_peer * p;
317    p = tr_new0( tr_peer, 1 );
318    p->addr = *addr;
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent          * torrent,
324         const tr_address * addr )
325{
326    tr_peer * peer;
327
328    assert( torrentIsLocked( torrent ) );
329
330    peer = getExistingPeer( torrent, addr );
331
332    if( peer == NULL )
333    {
334        peer = peerConstructor( addr );
335        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
336    }
337
338    return peer;
339}
340
341static void
342peerDestructor( tr_peer * peer )
343{
344    assert( peer );
345
346    if( peer->msgs != NULL )
347    {
348        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
349        tr_peerMsgsFree( peer->msgs );
350    }
351
352    tr_peerIoClear( peer->io );
353    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
354
355    tr_bitfieldFree( peer->have );
356    tr_bitfieldFree( peer->blame );
357    tr_free( peer->client );
358
359    tr_free( peer );
360}
361
362static void
363removePeer( Torrent * t,
364            tr_peer * peer )
365{
366    tr_peer *          removed;
367    struct peer_atom * atom;
368
369    assert( torrentIsLocked( t ) );
370
371    atom = getExistingAtom( t, &peer->addr );
372    assert( atom );
373    atom->time = time( NULL );
374
375    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
376    assert( removed == peer );
377    peerDestructor( removed );
378}
379
380static void
381removeAllPeers( Torrent * t )
382{
383    while( !tr_ptrArrayEmpty( &t->peers ) )
384        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
385}
386
387static void blockIteratorFree( struct tr_blockIterator ** inout );
388
389static void
390torrentDestructor( void * vt )
391{
392    Torrent * t = vt;
393    uint8_t   hash[SHA_DIGEST_LENGTH];
394
395    assert( t );
396    assert( !t->isRunning );
397    assert( torrentIsLocked( t ) );
398    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
399    assert( tr_ptrArrayEmpty( &t->peers ) );
400
401    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
402
403    tr_timerFree( &t->refillTimer );
404
405    blockIteratorFree( &t->refillQueue );
406    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
407    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
408    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
409    tr_ptrArrayDestruct( &t->peers, NULL );
410
411    tr_free( t->pendingRequestCount );
412    tr_free( t );
413}
414
415static void peerCallbackFunc( void * vpeer,
416                              void * vevent,
417                              void * vt );
418
419static Torrent*
420torrentConstructor( tr_peerMgr * manager,
421                    tr_torrent * tor )
422{
423    int       i;
424    Torrent * t;
425
426    t = tr_new0( Torrent, 1 );
427    t->manager = manager;
428    t->tor = tor;
429    t->pool = TR_PTR_ARRAY_INIT;
430    t->peers = TR_PTR_ARRAY_INIT;
431    t->webseeds = TR_PTR_ARRAY_INIT;
432    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
434
435    for( i = 0; i < tor->info.webseedCount; ++i )
436    {
437        tr_webseed * w =
438            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
439        tr_ptrArrayAppend( &t->webseeds, w );
440    }
441
442    return t;
443}
444
445
446static int bandwidthPulse ( void * vmgr );
447static int rechokePulse   ( void * vmgr );
448static int reconnectPulse ( void * vmgr );
449static int refillUpkeep   ( void * vmgr );
450
451tr_peerMgr*
452tr_peerMgrNew( tr_session * session )
453{
454    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
455
456    m->session = session;
457    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
458    m->bandwidthTimer    = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
459    m->rechokeTimer      = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
460    m->reconnectTimer    = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
461    m->refillUpkeepTimer = tr_timerNew( session, refillUpkeep,   m, REFILL_UPKEEP_PERIOD_MSEC );
462
463    rechokePulse( m );
464
465    return m;
466}
467
468void
469tr_peerMgrFree( tr_peerMgr * manager )
470{
471    managerLock( manager );
472
473    tr_timerFree( &manager->refillUpkeepTimer );
474    tr_timerFree( &manager->reconnectTimer );
475    tr_timerFree( &manager->rechokeTimer );
476    tr_timerFree( &manager->bandwidthTimer );
477
478    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
479     * the item from manager->handshakes, so this is a little roundabout... */
480    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
481        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
482
483    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
484
485    managerUnlock( manager );
486    tr_free( manager );
487}
488
489static int
490clientIsDownloadingFrom( const tr_peer * peer )
491{
492    return peer->clientIsInterested && !peer->clientIsChoked;
493}
494
495static int
496clientIsUploadingTo( const tr_peer * peer )
497{
498    return peer->peerIsInterested && !peer->peerIsChoked;
499}
500
501/***
502****
503***/
504
505tr_bool
506tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
507                      const tr_address  * addr )
508{
509    tr_bool isSeed = FALSE;
510    const Torrent * t = tor->torrentPeers;
511    const struct peer_atom * atom = getExistingAtom( t, addr );
512
513    if( atom )
514        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
515
516    return isSeed;
517}
518
519/****
520*****
521*****  REFILL
522*****
523****/
524
525static void
526assertValidPiece( Torrent * t, tr_piece_index_t piece )
527{
528    assert( t );
529    assert( t->tor );
530    assert( piece < t->tor->info.pieceCount );
531}
532
533static int
534getPieceRequests( Torrent * t, tr_piece_index_t piece )
535{
536    assertValidPiece( t, piece );
537
538    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
539}
540
541static void
542incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
543{
544    assertValidPiece( t, piece );
545
546    if( t->pendingRequestCount == NULL )
547        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
548    t->pendingRequestCount[piece]++;
549}
550
551static void
552decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
553{
554    assertValidPiece( t, piece );
555
556    if( t->pendingRequestCount )
557        t->pendingRequestCount[piece]--;
558}
559
560struct tr_refill_piece
561{
562    tr_priority_t    priority;
563    uint32_t         piece;
564    uint32_t         peerCount;
565    int              random;
566    int              pendingRequestCount;
567    int              missingBlockCount;
568};
569
570static int
571compareRefillPiece( const void * aIn, const void * bIn )
572{
573    const struct tr_refill_piece * a = aIn;
574    const struct tr_refill_piece * b = bIn;
575
576    /* if one piece has a higher priority, it goes first */
577    if( a->priority != b->priority )
578        return a->priority > b->priority ? -1 : 1;
579
580    /* have a per-priority endgame */
581    if( a->pendingRequestCount != b->pendingRequestCount )
582        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
583
584    /* fewer missing pieces goes first */
585    if( a->missingBlockCount != b->missingBlockCount )
586        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
587
588    /* otherwise if one has fewer peers, it goes first */
589    if( a->peerCount != b->peerCount )
590        return a->peerCount < b->peerCount ? -1 : 1;
591
592    /* otherwise go with our random seed */
593    if( a->random != b->random )
594        return a->random < b->random ? -1 : 1;
595
596    return 0;
597}
598
599static tr_piece_index_t *
600getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
601{
602    const tr_torrent  * tor = t->tor;
603    const tr_info     * inf = &tor->info;
604    tr_piece_index_t    i;
605    tr_piece_index_t    poolSize = 0;
606    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
607    int                 peerCount;
608    const tr_peer    ** peers;
609
610    assert( torrentIsLocked( t ) );
611
612    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
613    peerCount = tr_ptrArraySize( &t->peers );
614
615    /* make a list of the pieces that we want but don't have */
616    for( i = 0; i < inf->pieceCount; ++i )
617        if( !tor->info.pieces[i].dnd
618                && !tr_cpPieceIsComplete( &tor->completion, i ) )
619            pool[poolSize++] = i;
620
621    /* sort the pool by which to request next */
622    if( poolSize > 1 )
623    {
624        tr_piece_index_t j;
625        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
626
627        for( j = 0; j < poolSize; ++j )
628        {
629            int k;
630            const tr_piece_index_t piece = pool[j];
631            struct tr_refill_piece * setme = p + j;
632
633            setme->piece = piece;
634            setme->priority = inf->pieces[piece].priority;
635            setme->peerCount = 0;
636            setme->random = tr_cryptoWeakRandInt( INT_MAX );
637            setme->pendingRequestCount = getPieceRequests( t, piece );
638            setme->missingBlockCount
639                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
640
641            for( k = 0; k < peerCount; ++k )
642            {
643                const tr_peer * peer = peers[k];
644                if( peer->peerIsInterested
645                        && !peer->clientIsChoked
646                        && tr_bitfieldHas( peer->have, piece ) )
647                    ++setme->peerCount;
648            }
649        }
650
651        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
652               compareRefillPiece );
653
654        for( j = 0; j < poolSize; ++j )
655            pool[j] = p[j].piece;
656
657        tr_free( p );
658    }
659
660    *pieceCount = poolSize;
661    return pool;
662}
663
664static struct tr_blockIterator*
665blockIteratorNew( Torrent * t )
666{
667    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
668    tordbg( t, "creating new refill queue" );
669    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
670    i->t = t;
671    i->pieces = getPreferredPieces( t, &i->pieceCount );
672    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
673    return i;
674}
675
676static tr_bool
677blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
678{
679    tr_bool found;
680    Torrent * t = i->t;
681    tr_torrent * tor = t->tor;
682
683    while( ( i->blockIndex == i->blockCount )
684        && ( i->pieceIndex < i->pieceCount ) )
685    {
686        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
687        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
688        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
689        tr_block_index_t block;
690
691        assert( index < tor->info.pieceCount );
692
693        i->blockCount = 0;
694        i->blockIndex = 0;
695        for( block=b; block!=e; ++block )
696            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
697                i->blocks[i->blockCount++] = block;
698    }
699
700    assert( i->blockCount <= tor->blockCountInPiece );
701
702    if(( found = ( i->blockIndex < i->blockCount )))
703        *setme = i->blocks[i->blockIndex++];
704
705    return found;
706}
707
708static void
709blockIteratorFree( struct tr_blockIterator ** inout )
710{
711    struct tr_blockIterator * it = *inout;
712
713    if( it != NULL )
714    {
715        tr_free( it->blocks );
716        tr_free( it->pieces );
717        tr_free( it );
718    }
719
720    *inout = NULL;
721}
722
723static tr_peer**
724getPeersUploadingToClient( Torrent * t,
725                           int *     setmeCount )
726{
727    int j;
728    int peerCount = 0;
729    int retCount = 0;
730    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
731    tr_peer ** ret = tr_new( tr_peer *, peerCount );
732
733    j = 0; /* this is a temporary test to make sure we walk through all the peers */
734    if( peerCount )
735    {
736        /* Get a list of peers we're downloading from.
737           Pick a different starting point each time so all peers
738           get a chance at being the first in line */
739        const int fencepost = tr_cryptoWeakRandInt( peerCount );
740        int i = fencepost;
741        do {
742            if( clientIsDownloadingFrom( peers[i] ) )
743                ret[retCount++] = peers[i];
744            i = ( i + 1 ) % peerCount;
745            ++j;
746        } while( i != fencepost );
747    }
748    assert( j == peerCount );
749    *setmeCount = retCount;
750    return ret;
751}
752
753static uint32_t
754getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
755{
756    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
757    const uint64_t blockPos = tor->blockSize * b;
758    assert( blockPos >= piecePos );
759    return (uint32_t)( blockPos - piecePos );
760}
761
762static int
763refillUpkeep( void * vmgr )
764{
765    tr_torrent * tor = NULL;
766    tr_peerMgr * mgr = vmgr;
767    time_t now;
768    managerLock( mgr );
769
770    now = time( NULL );
771    while(( tor = tr_torrentNext( mgr->session, tor ))) {
772        Torrent * t = tor->torrentPeers;
773        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
774            tordbg( t, "refill queue is past its shelf date; discarding." );
775            blockIteratorFree( &t->refillQueue );
776        }
777    }
778
779    managerUnlock( mgr );
780    return TRUE;
781}
782
783static int
784refillPulse( void * vtorrent )
785{
786    tr_block_index_t block;
787    int peerCount;
788    int webseedCount;
789    tr_peer ** peers;
790    tr_webseed ** webseeds;
791    Torrent * t = vtorrent;
792    tr_torrent * tor = t->tor;
793    tr_bool hasNext = TRUE;
794
795    if( !t->isRunning )
796        return TRUE;
797    if( tr_torrentIsSeed( t->tor ) )
798        return TRUE;
799
800    torrentLock( t );
801    tordbg( t, "Refilling Request Buffers..." );
802
803    if( t->refillQueue == NULL )
804        t->refillQueue = blockIteratorNew( t );
805
806    peers = getPeersUploadingToClient( t, &peerCount );
807    webseedCount = tr_ptrArraySize( &t->webseeds );
808    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
809                          webseedCount * sizeof( tr_webseed* ) );
810
811    while( ( webseedCount || peerCount )
812        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
813    {
814        int j;
815        int handled = FALSE;
816
817        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
818        const uint32_t offset = getBlockOffsetInPiece( tor, block );
819        const uint32_t length = tr_torBlockCountBytes( tor, block );
820
821        /* find a peer who can ask for this block */
822        for( j=0; !handled && j<peerCount; )
823        {
824            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
825            switch( val )
826            {
827                case TR_ADDREQ_FULL:
828                case TR_ADDREQ_CLIENT_CHOKED:
829                    peers[j] = peers[--peerCount];
830                    break;
831
832                case TR_ADDREQ_MISSING:
833                case TR_ADDREQ_DUPLICATE:
834                    ++j;
835                    break;
836
837                case TR_ADDREQ_OK:
838                    incrementPieceRequests( t, index );
839                    handled = TRUE;
840                    break;
841
842                default:
843                    assert( 0 && "unhandled value" );
844                    break;
845            }
846        }
847
848        /* maybe one of the webseeds can do it */
849        for( j=0; !handled && j<webseedCount; )
850        {
851            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
852            switch( val )
853            {
854                case TR_ADDREQ_FULL:
855                    webseeds[j] = webseeds[--webseedCount];
856                    break;
857
858                case TR_ADDREQ_OK:
859                    incrementPieceRequests( t, index );
860                    handled = TRUE;
861                    break;
862
863                default:
864                    assert( 0 && "unhandled value" );
865                    break;
866            }
867        }
868    }
869
870    /* cleanup */
871    tr_free( webseeds );
872    tr_free( peers );
873
874    /* if we're out of blocks to request, free the request queue */
875    if( !hasNext )
876        blockIteratorFree( &t->refillQueue );
877
878    t->refillTimer = NULL;
879    torrentUnlock( t );
880    return FALSE;
881}
882
883static void
884broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
885{
886    size_t i;
887    size_t peerCount;
888    tr_peer ** peers;
889
890    assert( torrentIsLocked( t ) );
891
892    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
893
894    peerCount = tr_ptrArraySize( &t->peers );
895    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
896    for( i=0; i<peerCount; ++i )
897        if( peers[i]->msgs )
898            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
899}
900
901static void
902addStrike( Torrent * t,
903           tr_peer * peer )
904{
905    tordbg( t, "increasing peer %s strike count to %d",
906            tr_peerIoAddrStr( &peer->addr,
907                              peer->port ), peer->strikes + 1 );
908
909    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
910    {
911        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
912        atom->myflags |= MYFLAG_BANNED;
913        peer->doPurge = 1;
914        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
915    }
916}
917
918static void
919gotBadPiece( Torrent *        t,
920             tr_piece_index_t pieceIndex )
921{
922    tr_torrent *   tor = t->tor;
923    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
924
925    tor->corruptCur += byteCount;
926    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
927}
928
929static void
930refillSoon( Torrent * t )
931{
932    if( t->refillTimer == NULL )
933        t->refillTimer = tr_timerNew( t->manager->session,
934                                      refillPulse, t,
935                                      REFILL_PERIOD_MSEC );
936}
937
938static void
939peerSuggestedPiece( Torrent            * t UNUSED,
940                    tr_peer            * peer UNUSED,
941                    tr_piece_index_t     pieceIndex UNUSED,
942                    int                  isFastAllowed UNUSED )
943{
944#if 0
945    assert( t );
946    assert( peer );
947    assert( peer->msgs );
948
949    /* is this a valid piece? */
950    if(  pieceIndex >= t->tor->info.pieceCount )
951        return;
952
953    /* don't ask for it if we've already got it */
954    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
955        return;
956
957    /* don't ask for it if they don't have it */
958    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
959        return;
960
961    /* don't ask for it if we're choked and it's not fast */
962    if( !isFastAllowed && peer->clientIsChoked )
963        return;
964
965    /* request the blocks that we don't have in this piece */
966    {
967        tr_block_index_t block;
968        const tr_torrent * tor = t->tor;
969        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
970        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
971
972        for( block=start; block<end; ++block )
973        {
974            if( !tr_cpBlockIsComplete( tor->completion, block ) )
975            {
976                const uint32_t offset = getBlockOffsetInPiece( tor, block );
977                const uint32_t length = tr_torBlockCountBytes( tor, block );
978                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
979                incrementPieceRequests( t, pieceIndex );
980            }
981        }
982    }
983#endif
984}
985
986static void
987fireRatioLimitHit( tr_torrent * tor )
988{
989    assert( tr_isTorrent( tor ) );
990
991    if( tor->ratio_limit_hit_func )
992        tor->ratio_limit_hit_func( tor, tor->ratio_limit_hit_func_user_data );
993}
994
995static void
996peerCallbackFunc( void * vpeer, void * vevent, void * vt )
997{
998    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
999    Torrent * t = vt;
1000    const tr_peer_event * e = vevent;
1001
1002    torrentLock( t );
1003
1004    switch( e->eventType )
1005    {
1006        case TR_PEER_UPLOAD_ONLY:
1007            /* update our atom */
1008            if( peer ) {
1009                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1010                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
1011            }
1012            break;
1013
1014        case TR_PEER_NEED_REQ:
1015            refillSoon( t );
1016            break;
1017
1018        case TR_PEER_CANCEL:
1019            decrementPieceRequests( t, e->pieceIndex );
1020            break;
1021
1022        case TR_PEER_PEER_GOT_DATA:
1023        {
1024            const time_t now = time( NULL );
1025            tr_torrent * tor = t->tor;
1026            double seedRatio;
1027
1028            tor->activityDate = now;
1029
1030            if( e->wasPieceData )
1031                tor->uploadedCur += e->length;
1032
1033            /* update the stats */
1034            if( e->wasPieceData )
1035                tr_statsAddUploaded( tor->session, e->length );
1036
1037            /* update our atom */
1038            if( peer ) {
1039                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1040                if( e->wasPieceData )
1041                    a->piece_data_time = now;
1042            }
1043
1044            /* if we're seeding and we've reached our seed ratio limit, stop the torrent */
1045            if( tr_torrentIsSeed( tor ) && tr_torrentGetSeedRatio( tor, &seedRatio ) ) {
1046                const double up = tor->uploadedCur + tor->uploadedPrev;
1047                const double down = tor->downloadedCur + tor->downloadedPrev;
1048                const double ratio = tr_getRatio( up, down );
1049                if( ratio >= seedRatio ) {
1050                    tr_torrentStop( tor );
1051                   
1052                    /* set to no ratio limit to allow easy restarting */
1053                    tr_torrentSetRatioMode( tor, TR_RATIOLIMIT_UNLIMITED );
1054                   
1055                    fireRatioLimitHit( tor );
1056                }
1057            }
1058
1059            break;
1060        }
1061
1062        case TR_PEER_CLIENT_GOT_SUGGEST:
1063            if( peer )
1064                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1065            break;
1066
1067        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1068            if( peer )
1069                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1070            break;
1071
1072        case TR_PEER_CLIENT_GOT_DATA:
1073        {
1074            const time_t now = time( NULL );
1075            tr_torrent * tor = t->tor;
1076            tor->activityDate = now;
1077
1078            /* only add this to downloadedCur if we got it from a peer --
1079             * webseeds shouldn't count against our ratio.  As one tracker
1080             * admin put it, "Those pieces are downloaded directly from the
1081             * content distributor, not the peers, it is the tracker's job
1082             * to manage the swarms, not the web server and does not fit
1083             * into the jurisdiction of the tracker." */
1084            if( peer && e->wasPieceData )
1085                tor->downloadedCur += e->length;
1086
1087            /* update the stats */ 
1088            if( e->wasPieceData )
1089                tr_statsAddDownloaded( tor->session, e->length );
1090
1091            /* update our atom */
1092            if( peer ) {
1093                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1094                if( e->wasPieceData )
1095                    a->piece_data_time = now;
1096            }
1097
1098            break;
1099        }
1100
1101        case TR_PEER_PEER_PROGRESS:
1102        {
1103            if( peer )
1104            {
1105                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1106                const int peerIsSeed = e->progress >= 1.0;
1107                if( peerIsSeed ) {
1108                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1109                    atom->flags |= ADDED_F_SEED_FLAG;
1110                } else {
1111                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1112                    atom->flags &= ~ADDED_F_SEED_FLAG;
1113                }
1114            }
1115            break;
1116        }
1117
1118        case TR_PEER_CLIENT_GOT_BLOCK:
1119        {
1120            tr_torrent * tor = t->tor;
1121
1122            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1123
1124            tr_cpBlockAdd( &tor->completion, block );
1125            decrementPieceRequests( t, e->pieceIndex );
1126
1127            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1128
1129            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1130            {
1131                const tr_piece_index_t p = e->pieceIndex;
1132                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1133
1134                if( !ok )
1135                {
1136                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1137                               (unsigned long)p );
1138                }
1139
1140                tr_torrentSetHasPiece( tor, p, ok );
1141                tr_torrentSetPieceChecked( tor, p, TRUE );
1142                tr_peerMgrSetBlame( tor, p, ok );
1143
1144                if( !ok )
1145                {
1146                    gotBadPiece( t, p );
1147                }
1148                else
1149                {
1150                    int i;
1151                    int peerCount;
1152                    tr_peer ** peers;
1153                    tr_file_index_t fileIndex;
1154
1155                    peerCount = tr_ptrArraySize( &t->peers );
1156                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1157                    for( i=0; i<peerCount; ++i )
1158                        tr_peerMsgsHave( peers[i]->msgs, p );
1159
1160                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1161                    {
1162                        const tr_file * file = &tor->info.files[fileIndex];
1163                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1164                        {
1165                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1166                            tordbg( t, "closing recently-completed file \"%s\"", path );
1167                            tr_fdFileClose( path );
1168                            tr_free( path );
1169                        }
1170                    }
1171                }
1172
1173                tr_torrentRecheckCompleteness( tor );
1174            }
1175            break;
1176        }
1177
1178        case TR_PEER_ERROR:
1179            if( e->err == EINVAL )
1180            {
1181                addStrike( t, peer );
1182                peer->doPurge = 1;
1183                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1184            }
1185            else if( ( e->err == ERANGE )
1186                  || ( e->err == EMSGSIZE )
1187                  || ( e->err == ENOTCONN ) )
1188            {
1189                /* some protocol error from the peer */
1190                peer->doPurge = 1;
1191                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1192            }
1193            else /* a local error, such as an IO error */
1194            {
1195                t->tor->error = e->err;
1196                tr_strlcpy( t->tor->errorString,
1197                            tr_strerror( t->tor->error ),
1198                            sizeof( t->tor->errorString ) );
1199                tr_torrentStop( t->tor );
1200            }
1201            break;
1202
1203        default:
1204            assert( 0 );
1205    }
1206
1207    torrentUnlock( t );
1208}
1209
1210static void
1211ensureAtomExists( Torrent          * t,
1212                  const tr_address * addr,
1213                  tr_port            port,
1214                  uint8_t            flags,
1215                  uint8_t            from )
1216{
1217    if( getExistingAtom( t, addr ) == NULL )
1218    {
1219        struct peer_atom * a;
1220        a = tr_new0( struct peer_atom, 1 );
1221        a->addr = *addr;
1222        a->port = port;
1223        a->flags = flags;
1224        a->from = from;
1225        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1226        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1227    }
1228}
1229
1230static int
1231getMaxPeerCount( const tr_torrent * tor )
1232{
1233    return tor->maxConnectedPeers;
1234}
1235
1236static int
1237getPeerCount( const Torrent * t )
1238{
1239    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1240}
1241
1242/* FIXME: this is kind of a mess. */
1243static tr_bool
1244myHandshakeDoneCB( tr_handshake  * handshake,
1245                   tr_peerIo     * io,
1246                   tr_bool         isConnected,
1247                   const uint8_t * peer_id,
1248                   void          * vmanager )
1249{
1250    tr_bool            ok = isConnected;
1251    tr_bool            success = FALSE;
1252    tr_port            port;
1253    const tr_address * addr;
1254    tr_peerMgr       * manager = vmanager;
1255    Torrent          * t;
1256    tr_handshake     * ours;
1257
1258    assert( io );
1259    assert( tr_isBool( ok ) );
1260
1261    t = tr_peerIoHasTorrentHash( io )
1262        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1263        : NULL;
1264
1265    if( tr_peerIoIsIncoming ( io ) )
1266        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1267                                        handshake, handshakeCompare );
1268    else if( t )
1269        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1270                                        handshake, handshakeCompare );
1271    else
1272        ours = handshake;
1273
1274    assert( ours );
1275    assert( ours == handshake );
1276
1277    if( t )
1278        torrentLock( t );
1279
1280    addr = tr_peerIoGetAddress( io, &port );
1281
1282    if( !ok || !t || !t->isRunning )
1283    {
1284        if( t )
1285        {
1286            struct peer_atom * atom = getExistingAtom( t, addr );
1287            if( atom )
1288                ++atom->numFails;
1289        }
1290    }
1291    else /* looking good */
1292    {
1293        struct peer_atom * atom;
1294        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1295        atom = getExistingAtom( t, addr );
1296        atom->time = time( NULL );
1297        atom->piece_data_time = 0;
1298
1299        if( atom->myflags & MYFLAG_BANNED )
1300        {
1301            tordbg( t, "banned peer %s tried to reconnect",
1302                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1303        }
1304        else if( tr_peerIoIsIncoming( io )
1305               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1306
1307        {
1308        }
1309        else
1310        {
1311            tr_peer * peer = getExistingPeer( t, addr );
1312
1313            if( peer ) /* we already have this peer */
1314            {
1315            }
1316            else
1317            {
1318                peer = getPeer( t, addr );
1319                tr_free( peer->client );
1320
1321                if( !peer_id )
1322                    peer->client = NULL;
1323                else {
1324                    char client[128];
1325                    tr_clientForId( client, sizeof( client ), peer_id );
1326                    peer->client = tr_strdup( client );
1327                }
1328
1329                peer->port = port;
1330                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1331                                                                balanced by our unref in peerDestructor()  */
1332                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1333                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1334
1335                success = TRUE;
1336            }
1337        }
1338    }
1339
1340    if( t )
1341        torrentUnlock( t );
1342
1343    return success;
1344}
1345
1346void
1347tr_peerMgrAddIncoming( tr_peerMgr * manager,
1348                       tr_address * addr,
1349                       tr_port      port,
1350                       int          socket )
1351{
1352    managerLock( manager );
1353
1354    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1355    {
1356        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1357        tr_netClose( socket );
1358    }
1359    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1360    {
1361        tr_netClose( socket );
1362    }
1363    else /* we don't have a connetion to them yet... */
1364    {
1365        tr_peerIo *    io;
1366        tr_handshake * handshake;
1367
1368        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1369
1370        handshake = tr_handshakeNew( io,
1371                                     manager->session->encryptionMode,
1372                                     myHandshakeDoneCB,
1373                                     manager );
1374
1375        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1376
1377        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1378                                 handshakeCompare );
1379    }
1380
1381    managerUnlock( manager );
1382}
1383
1384static tr_bool
1385tr_isPex( const tr_pex * pex )
1386{
1387    return pex && tr_isAddress( &pex->addr );
1388}
1389
1390void
1391tr_peerMgrAddPex( tr_torrent   *  tor,
1392                  uint8_t         from,
1393                  const tr_pex *  pex )
1394{
1395    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1396    {
1397        Torrent * t = tor->torrentPeers;
1398        managerLock( t->manager );
1399
1400        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1401            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1402                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1403
1404        managerUnlock( t->manager );
1405    }
1406}
1407
1408tr_pex *
1409tr_peerMgrCompactToPex( const void *    compact,
1410                        size_t          compactLen,
1411                        const uint8_t * added_f,
1412                        size_t          added_f_len,
1413                        size_t *        pexCount )
1414{
1415    size_t          i;
1416    size_t          n = compactLen / 6;
1417    const uint8_t * walk = compact;
1418    tr_pex *        pex = tr_new0( tr_pex, n );
1419
1420    for( i = 0; i < n; ++i )
1421    {
1422        pex[i].addr.type = TR_AF_INET;
1423        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1424        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1425        if( added_f && ( n == added_f_len ) )
1426            pex[i].flags = added_f[i];
1427    }
1428
1429    *pexCount = n;
1430    return pex;
1431}
1432
1433tr_pex *
1434tr_peerMgrCompact6ToPex( const void    * compact,
1435                         size_t          compactLen,
1436                         const uint8_t * added_f,
1437                         size_t          added_f_len,
1438                         size_t        * pexCount )
1439{
1440    size_t          i;
1441    size_t          n = compactLen / 18;
1442    const uint8_t * walk = compact;
1443    tr_pex *        pex = tr_new0( tr_pex, n );
1444   
1445    for( i = 0; i < n; ++i )
1446    {
1447        pex[i].addr.type = TR_AF_INET6;
1448        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1449        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1450        if( added_f && ( n == added_f_len ) )
1451            pex[i].flags = added_f[i];
1452    }
1453   
1454    *pexCount = n;
1455    return pex;
1456}
1457
1458tr_pex *
1459tr_peerMgrArrayToPex( const void * array,
1460                      size_t       arrayLen,
1461                      size_t      * pexCount )
1462{
1463    size_t          i;
1464    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1465    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1466    const uint8_t * walk = array;
1467    tr_pex        * pex = tr_new0( tr_pex, n );
1468   
1469    for( i = 0 ; i < n ; i++ ) {
1470        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1471        tr_suspectAddress( &pex[i].addr, "tracker"  );
1472        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1473        pex[i].flags = 0x00;
1474        walk += sizeof( tr_address ) + 2;
1475    }
1476   
1477    *pexCount = n;
1478    return pex;
1479}
1480
1481/**
1482***
1483**/
1484
1485void
1486tr_peerMgrSetBlame( tr_torrent     * tor,
1487                    tr_piece_index_t pieceIndex,
1488                    int              success )
1489{
1490    if( !success )
1491    {
1492        int        peerCount, i;
1493        Torrent *  t = tor->torrentPeers;
1494        tr_peer ** peers;
1495
1496        assert( torrentIsLocked( t ) );
1497
1498        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1499        for( i = 0; i < peerCount; ++i )
1500        {
1501            tr_peer * peer = peers[i];
1502            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1503            {
1504                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1505                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1506                        pieceIndex, (int)peer->strikes + 1 );
1507                addStrike( t, peer );
1508            }
1509        }
1510    }
1511}
1512
1513int
1514tr_pexCompare( const void * va, const void * vb )
1515{
1516    const tr_pex * a = va;
1517    const tr_pex * b = vb;
1518    int i;
1519
1520    assert( tr_isPex( a ) );
1521    assert( tr_isPex( b ) );
1522
1523    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1524        return i;
1525
1526    if( a->port != b->port )
1527        return a->port < b->port ? -1 : 1;
1528
1529    return 0;
1530}
1531
1532static int
1533peerPrefersCrypto( const tr_peer * peer )
1534{
1535    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1536        return TRUE;
1537
1538    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1539        return FALSE;
1540
1541    return tr_peerIoIsEncrypted( peer->io );
1542}
1543
1544int
1545tr_peerMgrGetPeers( tr_torrent      * tor,
1546                    tr_pex         ** setme_pex,
1547                    uint8_t           af)
1548{
1549    int peersReturning = 0;
1550    const Torrent * t = tor->torrentPeers;
1551
1552    managerLock( t->manager );
1553
1554    {
1555        int i;
1556        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1557        const int peerCount = tr_ptrArraySize( &t->peers );
1558        /* for now, this will waste memory on torrents that have both
1559         * ipv6 and ipv4 peers */
1560        tr_pex * pex = tr_new( tr_pex, peerCount );
1561        tr_pex * walk = pex;
1562
1563        for( i=0; i<peerCount; ++i )
1564        {
1565            const tr_peer * peer = peers[i];
1566            if( peer->addr.type == af )
1567            {
1568                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1569
1570                assert( tr_isAddress( &peer->addr ) );
1571                walk->addr = peer->addr;
1572                walk->port = peer->port;
1573                walk->flags = 0;
1574                if( peerPrefersCrypto( peer ) )
1575                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1576                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1577                    walk->flags |= ADDED_F_SEED_FLAG;
1578                ++peersReturning;
1579                ++walk;
1580            }
1581        }
1582
1583        assert( ( walk - pex ) == peersReturning );
1584        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1585        *setme_pex = pex;
1586    }
1587
1588    managerUnlock( t->manager );
1589    return peersReturning;
1590}
1591
1592void
1593tr_peerMgrStartTorrent( tr_torrent * tor )
1594{
1595    Torrent * t = tor->torrentPeers;
1596
1597    managerLock( t->manager );
1598
1599    assert( t );
1600
1601    if( !t->isRunning )
1602    {
1603        t->isRunning = TRUE;
1604
1605        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1606            refillSoon( t );
1607    }
1608
1609    managerUnlock( t->manager );
1610}
1611
1612static void
1613stopTorrent( Torrent * t )
1614{
1615    assert( torrentIsLocked( t ) );
1616
1617    t->isRunning = FALSE;
1618
1619    /* disconnect the peers. */
1620    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1621    tr_ptrArrayClear( &t->peers );
1622
1623    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1624     * which removes the handshake from t->outgoingHandshakes... */
1625    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1626        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1627}
1628
1629void
1630tr_peerMgrStopTorrent( tr_torrent * tor )
1631{
1632    Torrent * t = tor->torrentPeers;
1633
1634    managerLock( t->manager );
1635
1636    stopTorrent( t );
1637
1638    managerUnlock( t->manager );
1639}
1640
1641void
1642tr_peerMgrAddTorrent( tr_peerMgr * manager,
1643                      tr_torrent * tor )
1644{
1645    managerLock( manager );
1646
1647    assert( tor );
1648    assert( tor->torrentPeers == NULL );
1649
1650    tor->torrentPeers = torrentConstructor( manager, tor );
1651
1652    managerUnlock( manager );
1653}
1654
1655void
1656tr_peerMgrRemoveTorrent( tr_torrent * tor )
1657{
1658    tr_torrentLock( tor );
1659
1660    stopTorrent( tor->torrentPeers );
1661    torrentDestructor( tor->torrentPeers );
1662
1663    tr_torrentUnlock( tor );
1664}
1665
1666void
1667tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1668                               int8_t           * tab,
1669                               unsigned int       tabCount )
1670{
1671    tr_piece_index_t   i;
1672    const Torrent *    t;
1673    float              interval;
1674    tr_bool            isSeed;
1675    int                peerCount;
1676    const tr_peer **   peers;
1677    tr_torrentLock( tor );
1678
1679    t = tor->torrentPeers;
1680    tor = t->tor;
1681    interval = tor->info.pieceCount / (float)tabCount;
1682    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1683    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1684    peerCount = tr_ptrArraySize( &t->peers );
1685
1686    memset( tab, 0, tabCount );
1687
1688    for( i = 0; tor && i < tabCount; ++i )
1689    {
1690        const int piece = i * interval;
1691
1692        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1693            tab[i] = -1;
1694        else if( peerCount ) {
1695            int j;
1696            for( j = 0; j < peerCount; ++j )
1697                if( tr_bitfieldHas( peers[j]->have, i ) )
1698                    ++tab[i];
1699        }
1700    }
1701
1702    tr_torrentUnlock( tor );
1703}
1704
1705/* Returns the pieces that are available from peers */
1706tr_bitfield*
1707tr_peerMgrGetAvailable( const tr_torrent * tor )
1708{
1709    int i;
1710    int peerCount;
1711    Torrent * t = tor->torrentPeers;
1712    const tr_peer ** peers;
1713    tr_bitfield * pieces;
1714    managerLock( t->manager );
1715
1716    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1717    peerCount = tr_ptrArraySize( &t->peers );
1718    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1719    for( i=0; i<peerCount; ++i )
1720        tr_bitfieldOr( pieces, peers[i]->have );
1721
1722    managerUnlock( t->manager );
1723    return pieces;
1724}
1725
1726void
1727tr_peerMgrTorrentStats( tr_torrent       * tor,
1728                        int              * setmePeersKnown,
1729                        int              * setmePeersConnected,
1730                        int              * setmeSeedsConnected,
1731                        int              * setmeWebseedsSendingToUs,
1732                        int              * setmePeersSendingToUs,
1733                        int              * setmePeersGettingFromUs,
1734                        int              * setmePeersFrom )
1735{
1736    int i, size;
1737    const Torrent * t = tor->torrentPeers;
1738    const tr_peer ** peers;
1739    const tr_webseed ** webseeds;
1740
1741    managerLock( t->manager );
1742
1743    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1744    size = tr_ptrArraySize( &t->peers );
1745
1746    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1747    *setmePeersConnected       = 0;
1748    *setmeSeedsConnected       = 0;
1749    *setmePeersGettingFromUs   = 0;
1750    *setmePeersSendingToUs     = 0;
1751    *setmeWebseedsSendingToUs  = 0;
1752
1753    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1754        setmePeersFrom[i] = 0;
1755
1756    for( i=0; i<size; ++i )
1757    {
1758        const tr_peer * peer = peers[i];
1759        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1760
1761        if( peer->io == NULL ) /* not connected */
1762            continue;
1763
1764        ++*setmePeersConnected;
1765
1766        ++setmePeersFrom[atom->from];
1767
1768        if( clientIsDownloadingFrom( peer ) )
1769            ++*setmePeersSendingToUs;
1770
1771        if( clientIsUploadingTo( peer ) )
1772            ++*setmePeersGettingFromUs;
1773
1774        if( atom->flags & ADDED_F_SEED_FLAG )
1775            ++*setmeSeedsConnected;
1776    }
1777
1778    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1779    size = tr_ptrArraySize( &t->webseeds );
1780    for( i=0; i<size; ++i )
1781        if( tr_webseedIsActive( webseeds[i] ) )
1782            ++*setmeWebseedsSendingToUs;
1783
1784    managerUnlock( t->manager );
1785}
1786
1787float*
1788tr_peerMgrWebSpeeds( const tr_torrent * tor )
1789{
1790    const Torrent * t = tor->torrentPeers;
1791    const tr_webseed ** webseeds;
1792    int i;
1793    int webseedCount;
1794    float * ret;
1795    uint64_t now;
1796
1797    assert( t->manager );
1798    managerLock( t->manager );
1799
1800    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1801    webseedCount = tr_ptrArraySize( &t->webseeds );
1802    assert( webseedCount == tor->info.webseedCount );
1803    ret = tr_new0( float, webseedCount );
1804    now = tr_date( );
1805
1806    for( i=0; i<webseedCount; ++i )
1807        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1808            ret[i] = -1.0;
1809
1810    managerUnlock( t->manager );
1811    return ret;
1812}
1813
1814double
1815tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1816{
1817    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1818}
1819
1820
1821struct tr_peer_stat *
1822tr_peerMgrPeerStats( const tr_torrent    * tor,
1823                     int                 * setmeCount )
1824{
1825    int i, size;
1826    const Torrent * t = tor->torrentPeers;
1827    const tr_peer ** peers;
1828    tr_peer_stat * ret;
1829    uint64_t now;
1830
1831    assert( t->manager );
1832    managerLock( t->manager );
1833
1834    size = tr_ptrArraySize( &t->peers );
1835    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1836    ret = tr_new0( tr_peer_stat, size );
1837    now = tr_date( );
1838
1839    for( i = 0; i < size; ++i )
1840    {
1841        char *                   pch;
1842        const tr_peer *          peer = peers[i];
1843        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1844        tr_peer_stat *           stat = ret + i;
1845        tr_address               norm_addr;
1846
1847        norm_addr = peer->addr;
1848        tr_normalizeV4Mapped( &norm_addr );
1849        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1850        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1851                   sizeof( stat->client ) );
1852        stat->port               = ntohs( peer->port );
1853        stat->from               = atom->from;
1854        stat->progress           = peer->progress;
1855        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1856        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1857        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1858        stat->peerIsChoked       = peer->peerIsChoked;
1859        stat->peerIsInterested   = peer->peerIsInterested;
1860        stat->clientIsChoked     = peer->clientIsChoked;
1861        stat->clientIsInterested = peer->clientIsInterested;
1862        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1863        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1864        stat->isUploadingTo      = clientIsUploadingTo( peer );
1865        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1866
1867        pch = stat->flagStr;
1868        if( t->optimistic == peer ) *pch++ = 'O';
1869        if( stat->isDownloadingFrom ) *pch++ = 'D';
1870        else if( stat->clientIsInterested ) *pch++ = 'd';
1871        if( stat->isUploadingTo ) *pch++ = 'U';
1872        else if( stat->peerIsInterested ) *pch++ = 'u';
1873        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1874        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1875        if( stat->isEncrypted ) *pch++ = 'E';
1876        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1877        if( stat->isIncoming ) *pch++ = 'I';
1878        *pch = '\0';
1879    }
1880
1881    *setmeCount = size;
1882
1883    managerUnlock( t->manager );
1884    return ret;
1885}
1886
1887/**
1888***
1889**/
1890
1891struct ChokeData
1892{
1893    tr_bool         doUnchoke;
1894    tr_bool         isInterested;
1895    tr_bool         isChoked;
1896    int             rate;
1897    tr_peer *       peer;
1898};
1899
1900static int
1901compareChoke( const void * va,
1902              const void * vb )
1903{
1904    const struct ChokeData * a = va;
1905    const struct ChokeData * b = vb;
1906
1907    if( a->rate != b->rate ) /* prefer higher overall speeds */
1908        return a->rate > b->rate ? -1 : 1;
1909
1910    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1911        return a->isChoked ? 1 : -1;
1912
1913    return 0;
1914}
1915
1916static int
1917isNew( const tr_peer * peer )
1918{
1919    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1920}
1921
1922static int
1923isSame( const tr_peer * peer )
1924{
1925    return peer && peer->client && strstr( peer->client, "Transmission" );
1926}
1927
1928/**
1929***
1930**/
1931
1932static void
1933rechokeTorrent( Torrent * t )
1934{
1935    int i, size, unchokedInterested;
1936    const int peerCount = tr_ptrArraySize( &t->peers );
1937    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1938    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1939    const tr_session * session = t->manager->session;
1940    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1941    const uint64_t now = tr_date( );
1942
1943    assert( torrentIsLocked( t ) );
1944
1945    /* sort the peers by preference and rate */
1946    for( i = 0, size = 0; i < peerCount; ++i )
1947    {
1948        tr_peer * peer = peers[i];
1949        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1950
1951        if( peer->progress >= 1.0 ) /* choke all seeds */
1952        {
1953            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1954        }
1955        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1956        {
1957            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1958        }
1959        else if( chokeAll ) /* choke everyone if we're not uploading */
1960        {
1961            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1962        }
1963        else
1964        {
1965            struct ChokeData * n = &choke[size++];
1966            n->peer         = peer;
1967            n->isInterested = peer->peerIsInterested;
1968            n->isChoked     = peer->peerIsChoked;
1969            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1970        }
1971    }
1972
1973    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1974
1975    /**
1976     * Reciprocation and number of uploads capping is managed by unchoking
1977     * the N peers which have the best upload rate and are interested.
1978     * This maximizes the client's download rate. These N peers are
1979     * referred to as downloaders, because they are interested in downloading
1980     * from the client.
1981     *
1982     * Peers which have a better upload rate (as compared to the downloaders)
1983     * but aren't interested get unchoked. If they become interested, the
1984     * downloader with the worst upload rate gets choked. If a client has
1985     * a complete file, it uses its upload rate rather than its download
1986     * rate to decide which peers to unchoke.
1987     */
1988    unchokedInterested = 0;
1989    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1990        choke[i].doUnchoke = 1;
1991        if( choke[i].isInterested )
1992            ++unchokedInterested;
1993    }
1994
1995    /* optimistic unchoke */
1996    if( i < size )
1997    {
1998        int n;
1999        struct ChokeData * c;
2000        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2001
2002        for( ; i<size; ++i )
2003        {
2004            if( choke[i].isInterested )
2005            {
2006                const tr_peer * peer = choke[i].peer;
2007                int x = 1, y;
2008                if( isNew( peer ) ) x *= 3;
2009                if( isSame( peer ) ) x *= 3;
2010                for( y=0; y<x; ++y )
2011                    tr_ptrArrayAppend( &randPool, &choke[i] );
2012            }
2013        }
2014
2015        if(( n = tr_ptrArraySize( &randPool )))
2016        {
2017            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2018            c->doUnchoke = 1;
2019            t->optimistic = c->peer;
2020        }
2021
2022        tr_ptrArrayDestruct( &randPool, NULL );
2023    }
2024
2025    for( i=0; i<size; ++i )
2026        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2027
2028    /* cleanup */
2029    tr_free( choke );
2030}
2031
2032static int
2033rechokePulse( void * vmgr )
2034{
2035    tr_torrent * tor = NULL;
2036    tr_peerMgr * mgr = vmgr;
2037    managerLock( mgr );
2038
2039    while(( tor = tr_torrentNext( mgr->session, tor )))
2040        if( tor->isRunning )
2041            rechokeTorrent( tor->torrentPeers );
2042
2043    managerUnlock( mgr );
2044    return TRUE;
2045}
2046
2047/***
2048****
2049****  Life and Death
2050****
2051***/
2052
2053typedef enum
2054{
2055    TR_CAN_KEEP,
2056    TR_CAN_CLOSE,
2057    TR_MUST_CLOSE,
2058}
2059tr_close_type_t;
2060
2061static tr_close_type_t
2062shouldPeerBeClosed( const Torrent    * t,
2063                    const tr_peer    * peer,
2064                    int                peerCount )
2065{
2066    const tr_torrent *       tor = t->tor;
2067    const time_t             now = time( NULL );
2068    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2069
2070    /* if it's marked for purging, close it */
2071    if( peer->doPurge )
2072    {
2073        tordbg( t, "purging peer %s because its doPurge flag is set",
2074                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2075        return TR_MUST_CLOSE;
2076    }
2077
2078    /* if we're seeding and the peer has everything we have,
2079     * and enough time has passed for a pex exchange, then disconnect */
2080    if( tr_torrentIsSeed( tor ) )
2081    {
2082        int peerHasEverything;
2083        if( atom->flags & ADDED_F_SEED_FLAG )
2084            peerHasEverything = TRUE;
2085        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2086            peerHasEverything = FALSE;
2087        else {
2088            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2089            tr_bitfieldDifference( tmp, peer->have );
2090            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2091            tr_bitfieldFree( tmp );
2092        }
2093
2094        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2095        {
2096            tordbg( t, "purging peer %s because we're both seeds",
2097                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2098            return TR_MUST_CLOSE;
2099        }
2100    }
2101
2102    /* disconnect if it's been too long since piece data has been transferred.
2103     * this is on a sliding scale based on number of available peers... */
2104    {
2105        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2106        /* if we have >= relaxIfFewerThan, strictness is 100%.
2107         * if we have zero connections, strictness is 0% */
2108        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2109                               ? 1.0
2110                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2111        const int lo = MIN_UPLOAD_IDLE_SECS;
2112        const int hi = MAX_UPLOAD_IDLE_SECS;
2113        const int limit = hi - ( ( hi - lo ) * strictness );
2114        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2115/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2116        if( idleTime > limit ) {
2117            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2118                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2119            return TR_CAN_CLOSE;
2120        }
2121    }
2122
2123    return TR_CAN_KEEP;
2124}
2125
2126static tr_peer **
2127getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2128{
2129    int i, peerCount, outsize;
2130    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2131    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2132
2133    assert( torrentIsLocked( t ) );
2134
2135    for( i = outsize = 0; i < peerCount; ++i )
2136        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2137            ret[outsize++] = peers[i];
2138
2139    *setmeSize = outsize;
2140    return ret;
2141}
2142
2143static int
2144compareCandidates( const void * va,
2145                   const void * vb )
2146{
2147    const struct peer_atom * a = *(const struct peer_atom**) va;
2148    const struct peer_atom * b = *(const struct peer_atom**) vb;
2149
2150    /* <Charles> Here we would probably want to try reconnecting to
2151     * peers that had most recently given us data. Lots of users have
2152     * trouble with resets due to their routers and/or ISPs. This way we
2153     * can quickly recover from an unwanted reset. So we sort
2154     * piece_data_time in descending order.
2155     */
2156
2157    if( a->piece_data_time != b->piece_data_time )
2158        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2159
2160    if( a->numFails != b->numFails )
2161        return a->numFails < b->numFails ? -1 : 1;
2162
2163    if( a->time != b->time )
2164        return a->time < b->time ? -1 : 1;
2165
2166    /* all other things being equal, prefer peers whose
2167     * information comes from a more reliable source */
2168    if( a->from != b->from )
2169        return a->from < b->from ? -1 : 1;
2170
2171    return 0;
2172}
2173
2174static int
2175getReconnectIntervalSecs( const struct peer_atom * atom )
2176{
2177    int          sec;
2178    const time_t now = time( NULL );
2179
2180    /* if we were recently connected to this peer and transferring piece
2181     * data, try to reconnect to them sooner rather that later -- we don't
2182     * want network troubles to get in the way of a good peer. */
2183    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2184        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2185
2186    /* don't allow reconnects more often than our minimum */
2187    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2188        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2189
2190    /* otherwise, the interval depends on how many times we've tried
2191     * and failed to connect to the peer */
2192    else switch( atom->numFails ) {
2193        case 0: sec = 0; break;
2194        case 1: sec = 5; break;
2195        case 2: sec = 2 * 60; break;
2196        case 3: sec = 15 * 60; break;
2197        case 4: sec = 30 * 60; break;
2198        case 5: sec = 60 * 60; break;
2199        default: sec = 120 * 60; break;
2200    }
2201
2202    return sec;
2203}
2204
2205static struct peer_atom **
2206getPeerCandidates( Torrent * t, int * setmeSize )
2207{
2208    int                 i, atomCount, retCount;
2209    struct peer_atom ** atoms;
2210    struct peer_atom ** ret;
2211    const time_t        now = time( NULL );
2212    const int           seed = tr_torrentIsSeed( t->tor );
2213
2214    assert( torrentIsLocked( t ) );
2215
2216    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2217    ret = tr_new( struct peer_atom*, atomCount );
2218    for( i = retCount = 0; i < atomCount; ++i )
2219    {
2220        int                interval;
2221        struct peer_atom * atom = atoms[i];
2222
2223        /* peer fed us too much bad data ... we only keep it around
2224         * now to weed it out in case someone sends it to us via pex */
2225        if( atom->myflags & MYFLAG_BANNED )
2226            continue;
2227
2228        /* peer was unconnectable before, so we're not going to keep trying.
2229         * this is needs a separate flag from `banned', since if they try
2230         * to connect to us later, we'll let them in */
2231        if( atom->myflags & MYFLAG_UNREACHABLE )
2232            continue;
2233
2234        /* we don't need two connections to the same peer... */
2235        if( peerIsInUse( t, &atom->addr ) )
2236            continue;
2237
2238        /* no need to connect if we're both seeds... */
2239        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2240                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2241            continue;
2242
2243        /* don't reconnect too often */
2244        interval = getReconnectIntervalSecs( atom );
2245        if( ( now - atom->time ) < interval )
2246        {
2247            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2248                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2249            continue;
2250        }
2251
2252        /* Don't connect to peers in our blocklist */
2253        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2254            continue;
2255
2256        ret[retCount++] = atom;
2257    }
2258
2259    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2260    *setmeSize = retCount;
2261    return ret;
2262}
2263
2264static void
2265closePeer( Torrent * t, tr_peer * peer )
2266{
2267    struct peer_atom * atom;
2268
2269    assert( t != NULL );
2270    assert( peer != NULL );
2271
2272    /* if we transferred piece data, then they might be good peers,
2273       so reset their `numFails' weight to zero.  otherwise we connected
2274       to them fruitlessly, so mark it as another fail */
2275    atom = getExistingAtom( t, &peer->addr );
2276    if( atom->piece_data_time )
2277        atom->numFails = 0;
2278    else
2279        ++atom->numFails;
2280
2281    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2282    removePeer( t, peer );
2283}
2284
2285static void
2286reconnectTorrent( Torrent * t )
2287{
2288    static time_t prevTime = 0;
2289    static int    newConnectionsThisSecond = 0;
2290    time_t        now;
2291
2292    now = time( NULL );
2293    if( prevTime != now )
2294    {
2295        prevTime = now;
2296        newConnectionsThisSecond = 0;
2297    }
2298
2299    if( !t->isRunning )
2300    {
2301        removeAllPeers( t );
2302    }
2303    else
2304    {
2305        int i;
2306        int canCloseCount;
2307        int mustCloseCount;
2308        int candidateCount;
2309        int maxCandidates;
2310        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2311        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2312        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2313
2314        tordbg( t, "reconnect pulse for [%s]: "
2315                   "%d must-close connections, "
2316                   "%d can-close connections, "
2317                   "%d connection candidates, "
2318                   "%d atoms, "
2319                   "max per pulse is %d",
2320                   t->tor->info.name,
2321                   mustCloseCount,
2322                   canCloseCount,
2323                   candidateCount,
2324                   tr_ptrArraySize( &t->pool ),
2325                   MAX_RECONNECTIONS_PER_PULSE );
2326
2327        /* disconnect the really bad peers */
2328        for( i=0; i<mustCloseCount; ++i )
2329            closePeer( t, mustClose[i] );
2330
2331        /* decide how many peers can we try to add in this pass */
2332        maxCandidates = candidateCount;
2333        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2334        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2335        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2336
2337        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2338        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2339            closePeer( t, canClose[i] );
2340
2341        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2342                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2343                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2344                   candidateCount,
2345                   MAX_RECONNECTIONS_PER_PULSE,
2346                   getPeerCount( t ),
2347                   getMaxPeerCount( t->tor ),
2348                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2349
2350        /* add some new ones */
2351        for( i=0; i<maxCandidates; ++i )
2352        {
2353            tr_peerMgr        * mgr = t->manager;
2354            struct peer_atom  * atom = candidates[i];
2355            tr_peerIo         * io;
2356
2357            tordbg( t, "Starting an OUTGOING connection with %s",
2358                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2359
2360            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2361
2362            if( io == NULL )
2363            {
2364                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2365                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2366                atom->myflags |= MYFLAG_UNREACHABLE;
2367            }
2368            else
2369            {
2370                tr_handshake * handshake = tr_handshakeNew( io,
2371                                                            mgr->session->encryptionMode,
2372                                                            myHandshakeDoneCB,
2373                                                            mgr );
2374
2375                assert( tr_peerIoGetTorrentHash( io ) );
2376
2377                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2378
2379                ++newConnectionsThisSecond;
2380
2381                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2382                                         handshakeCompare );
2383            }
2384
2385            atom->time = time( NULL );
2386        }
2387
2388        /* cleanup */
2389        tr_free( candidates );
2390        tr_free( mustClose );
2391        tr_free( canClose );
2392    }
2393}
2394
2395static int
2396reconnectPulse( void * vmgr )
2397{
2398    tr_torrent * tor = NULL;
2399    tr_peerMgr * mgr = vmgr;
2400    managerLock( mgr );
2401
2402    while(( tor = tr_torrentNext( mgr->session, tor )))
2403        if( tor->isRunning )
2404            reconnectTorrent( tor->torrentPeers );
2405
2406    managerUnlock( mgr );
2407    return TRUE;
2408}
2409
2410/****
2411*****
2412*****  BANDWIDTH ALLOCATION
2413*****
2414****/
2415
2416static void
2417pumpAllPeers( tr_peerMgr * mgr )
2418{
2419    tr_torrent * tor = NULL;
2420
2421    while(( tor = tr_torrentNext( mgr->session, tor )))
2422    {
2423        int j;
2424        Torrent * t = tor->torrentPeers;
2425
2426        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2427        {
2428            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2429            tr_peerMsgsPulse( peer->msgs );
2430        }
2431    }
2432}
2433
2434static int
2435bandwidthPulse( void * vmgr )
2436{
2437    tr_peerMgr * mgr = vmgr;
2438    managerLock( mgr );
2439
2440    /* FIXME: this next line probably isn't necessary... */
2441    pumpAllPeers( mgr );
2442
2443    /* allocate bandwidth to the peers */
2444    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2445    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2446
2447    managerUnlock( mgr );
2448    return TRUE;
2449}
Note: See TracBrowser for help on using the repository browser.