source: trunk/libtransmission/peer-mgr.c @ 9470

Last change on this file since 9470 was 9470, checked in by charles, 13 years ago

(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...

  • Property svn:keywords set to Date Rev Author Id
File size: 75.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9470 2009-11-02 00:17:30Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 400,
49
50    /* how frequently to reallocate bandwidth */
51    BANDWIDTH_PERIOD_MSEC = 500,
52
53    /* how frequently to age out old piece request lists */
54    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = 500,
58
59    /* when many peers are available, keep idle ones this long */
60    MIN_UPLOAD_IDLE_SECS = ( 60 ),
61
62    /* when few peers are available, keep idle ones this long */
63    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
64
65    /* max # of peers to ask fer per torrent per reconnect pulse */
66    MAX_RECONNECTIONS_PER_PULSE = 8,
67
68    /* max number of peers to ask for per second overall.
69    * this throttle is to avoid overloading the router */
70    MAX_CONNECTIONS_PER_SECOND = 16,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 5,
74
75    /* amount of time to keep a list of request pieces lying around
76       before it's considered too old and needs to be rebuilt */
77    PIECE_LIST_SHELF_LIFE_SECS = 60,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    MYFLAG_BANNED = 1,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    /* unreachable for now... but not banned.
84     * if they try to connect to us it's okay */
85    MYFLAG_UNREACHABLE = 2,
86
87    /* the minimum we'll wait before attempting to reconnect to a peer */
88    MINIMUM_RECONNECT_INTERVAL_SECS = 5
89};
90
91
92/**
93***
94**/
95
96enum
97{
98    UPLOAD_ONLY_UKNOWN,
99    UPLOAD_ONLY_YES,
100    UPLOAD_ONLY_NO
101};
102
103/**
104 * Peer information that should be kept even before we've connected and
105 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
106 * which ones would make good candidates for connecting to, and to watch out
107 * for banned peers.
108 *
109 * @see tr_peer
110 * @see tr_peermsgs
111 */
112struct peer_atom
113{
114    uint8_t     from;
115    uint8_t     flags;       /* these match the added_f flags */
116    uint8_t     myflags;     /* flags that aren't defined in added_f */
117    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
118    tr_port     port;
119    uint16_t    numFails;
120    tr_address  addr;
121    time_t      time;        /* when the peer's connection status last changed */
122    time_t      piece_data_time;
123};
124
125static tr_bool
126tr_isAtom( const struct peer_atom * atom )
127{
128    return ( atom != NULL )
129        && ( atom->from < TR_PEER_FROM__MAX )
130        && ( tr_isAddress( &atom->addr ) );
131}
132
133static const char*
134tr_atomAddrStr( const struct peer_atom * atom )
135{
136    return tr_peerIoAddrStr( &atom->addr, atom->port );
137}
138
139struct tr_blockIterator
140{
141    time_t expirationDate;
142    struct tr_torrent_peers * t;
143    tr_block_index_t blockIndex, blockCount, *blocks;
144    tr_piece_index_t pieceIndex, pieceCount, *pieces;
145};
146
147typedef struct tr_torrent_peers
148{
149    struct event               refillTimer;
150
151    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
152    tr_ptrArray                pool; /* struct peer_atom */
153    tr_ptrArray                peers; /* tr_peer */
154    tr_ptrArray                webseeds; /* tr_webseed */
155
156    tr_torrent               * tor;
157    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
158    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
159    struct tr_peerMgr        * manager;
160    int                      * pendingRequestCount;
161
162    tr_bool                    isRunning;
163}
164Torrent;
165
166struct tr_peerMgr
167{
168    tr_session      * session;
169    tr_ptrArray       incomingHandshakes; /* tr_handshake */
170    tr_timer        * bandwidthTimer;
171    tr_timer        * rechokeTimer;
172    tr_timer        * reconnectTimer;
173    tr_timer        * refillUpkeepTimer;
174};
175
176#define tordbg( t, ... ) \
177    do { \
178        if( tr_deepLoggingIsActive( ) ) \
179            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
180    } while( 0 )
181
182#define dbgmsg( ... ) \
183    do { \
184        if( tr_deepLoggingIsActive( ) ) \
185            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
186    } while( 0 )
187
188/**
189***
190**/
191
192static TR_INLINE void
193managerLock( const struct tr_peerMgr * manager )
194{
195    tr_globalLock( manager->session );
196}
197
198static TR_INLINE void
199managerUnlock( const struct tr_peerMgr * manager )
200{
201    tr_globalUnlock( manager->session );
202}
203
204static TR_INLINE void
205torrentLock( Torrent * torrent )
206{
207    managerLock( torrent->manager );
208}
209
210static TR_INLINE void
211torrentUnlock( Torrent * torrent )
212{
213    managerUnlock( torrent->manager );
214}
215
216static TR_INLINE int
217torrentIsLocked( const Torrent * t )
218{
219    return tr_globalIsLocked( t->manager->session );
220}
221
222/**
223***
224**/
225
226static int
227handshakeCompareToAddr( const void * va, const void * vb )
228{
229    const tr_handshake * a = va;
230
231    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
232}
233
234static int
235handshakeCompare( const void * a, const void * b )
236{
237    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
238}
239
240static tr_handshake*
241getExistingHandshake( tr_ptrArray      * handshakes,
242                      const tr_address * addr )
243{
244    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
245}
246
247static int
248comparePeerAtomToAddress( const void * va, const void * vb )
249{
250    const struct peer_atom * a = va;
251
252    return tr_compareAddresses( &a->addr, vb );
253}
254
255static int
256comparePeerAtoms( const void * va, const void * vb )
257{
258    const struct peer_atom * b = vb;
259
260    assert( tr_isAtom( b ) );
261
262    return comparePeerAtomToAddress( va, &b->addr );
263}
264
265/**
266***
267**/
268
269const tr_address *
270tr_peerAddress( const tr_peer * peer )
271{
272    return &peer->atom->addr;
273}
274
275static Torrent*
276getExistingTorrent( tr_peerMgr *    manager,
277                    const uint8_t * hash )
278{
279    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
280
281    return tor == NULL ? NULL : tor->torrentPeers;
282}
283
284static int
285peerCompare( const void * a, const void * b )
286{
287    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
288}
289
290static int
291peerCompareToAddr( const void * a, const void * vb )
292{
293    return tr_compareAddresses( tr_peerAddress( a ), vb );
294}
295
296static tr_peer*
297getExistingPeer( Torrent          * torrent,
298                 const tr_address * addr )
299{
300    assert( torrentIsLocked( torrent ) );
301    assert( addr );
302
303    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
304}
305
306static struct peer_atom*
307getExistingAtom( const Torrent    * t,
308                 const tr_address * addr )
309{
310    Torrent * tt = (Torrent*)t;
311    assert( torrentIsLocked( t ) );
312    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
313}
314
315static tr_bool
316peerIsInUse( const Torrent    * ct,
317             const tr_address * addr )
318{
319    Torrent * t = (Torrent*) ct;
320
321    assert( torrentIsLocked ( t ) );
322
323    return getExistingPeer( t, addr )
324        || getExistingHandshake( &t->outgoingHandshakes, addr )
325        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
326}
327
328static tr_peer*
329peerConstructor( struct peer_atom * atom )
330{
331    tr_peer * peer = tr_new0( tr_peer, 1 );
332    peer->atom = atom;
333    return peer;
334}
335
336static tr_peer*
337getPeer( Torrent * torrent, struct peer_atom * atom )
338{
339    tr_peer * peer;
340
341    assert( torrentIsLocked( torrent ) );
342
343    peer = getExistingPeer( torrent, &atom->addr );
344
345    if( peer == NULL )
346    {
347        peer = peerConstructor( atom );
348        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
349    }
350
351    return peer;
352}
353
354static void
355peerDestructor( tr_peer * peer )
356{
357    assert( peer );
358
359    if( peer->msgs != NULL )
360    {
361        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
362        tr_peerMsgsFree( peer->msgs );
363    }
364
365    tr_peerIoClear( peer->io );
366    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
367
368    tr_bitfieldFree( peer->have );
369    tr_bitfieldFree( peer->blame );
370    tr_free( peer->client );
371
372    tr_free( peer );
373}
374
375static void
376removePeer( Torrent * t, tr_peer * peer )
377{
378    tr_peer * removed;
379    struct peer_atom * atom = peer->atom;
380
381    assert( torrentIsLocked( t ) );
382    assert( atom );
383
384    atom->time = time( NULL );
385
386    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
387    assert( removed == peer );
388    peerDestructor( removed );
389}
390
391static void
392removeAllPeers( Torrent * t )
393{
394    while( !tr_ptrArrayEmpty( &t->peers ) )
395        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
396}
397
398static void blockIteratorFree( struct tr_blockIterator ** inout );
399
400static void
401torrentDestructor( void * vt )
402{
403    Torrent * t = vt;
404
405    assert( t );
406    assert( !t->isRunning );
407    assert( torrentIsLocked( t ) );
408    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
409    assert( tr_ptrArrayEmpty( &t->peers ) );
410
411    evtimer_del( &t->refillTimer );
412
413    blockIteratorFree( &t->refillQueue );
414    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
417    tr_ptrArrayDestruct( &t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423
424static void refillPulse( int, short, void* );
425
426static void peerCallbackFunc( void * vpeer,
427                              void * vevent,
428                              void * vt );
429
430static Torrent*
431torrentConstructor( tr_peerMgr * manager,
432                    tr_torrent * tor )
433{
434    int       i;
435    Torrent * t;
436
437    t = tr_new0( Torrent, 1 );
438    t->manager = manager;
439    t->tor = tor;
440    t->pool = TR_PTR_ARRAY_INIT;
441    t->peers = TR_PTR_ARRAY_INIT;
442    t->webseeds = TR_PTR_ARRAY_INIT;
443    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
444    evtimer_set( &t->refillTimer, refillPulse, t );
445
446
447    for( i = 0; i < tor->info.webseedCount; ++i )
448    {
449        tr_webseed * w =
450            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
451        tr_ptrArrayAppend( &t->webseeds, w );
452    }
453
454    return t;
455}
456
457
458static int bandwidthPulse ( void * vmgr );
459static int rechokePulse   ( void * vmgr );
460static int reconnectPulse ( void * vmgr );
461static int refillUpkeep   ( void * vmgr );
462
463tr_peerMgr*
464tr_peerMgrNew( tr_session * session )
465{
466    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
467    m->session = session;
468    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
469    return m;
470}
471
472static void
473deleteTimers( struct tr_peerMgr * m )
474{
475    if( m->bandwidthTimer )
476        tr_timerFree( &m->bandwidthTimer );
477
478    if( m->rechokeTimer )
479        tr_timerFree( &m->rechokeTimer );
480
481    if( m->reconnectTimer )
482        tr_timerFree( &m->reconnectTimer );
483
484    if( m->refillUpkeepTimer )
485        tr_timerFree( &m->refillUpkeepTimer );
486}
487
488void
489tr_peerMgrFree( tr_peerMgr * manager )
490{
491    managerLock( manager );
492
493    deleteTimers( manager );
494
495    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
496     * the item from manager->handshakes, so this is a little roundabout... */
497    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
498        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
499
500    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
501
502    managerUnlock( manager );
503    tr_free( manager );
504}
505
506static int
507clientIsDownloadingFrom( const tr_peer * peer )
508{
509    return peer->clientIsInterested && !peer->clientIsChoked;
510}
511
512static int
513clientIsUploadingTo( const tr_peer * peer )
514{
515    return peer->peerIsInterested && !peer->peerIsChoked;
516}
517
518/***
519****
520***/
521
522tr_bool
523tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
524                      const tr_address  * addr )
525{
526    tr_bool isSeed = FALSE;
527    const Torrent * t = tor->torrentPeers;
528    const struct peer_atom * atom = getExistingAtom( t, addr );
529
530    if( atom )
531        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
532
533    return isSeed;
534}
535
536/****
537*****
538*****  REFILL
539*****
540****/
541
542static void
543assertValidPiece( Torrent * t, tr_piece_index_t piece )
544{
545    assert( t );
546    assert( t->tor );
547    assert( piece < t->tor->info.pieceCount );
548}
549
550static int
551getPieceRequests( Torrent * t, tr_piece_index_t piece )
552{
553    assertValidPiece( t, piece );
554
555    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
556}
557
558static void
559incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
560{
561    assertValidPiece( t, piece );
562
563    if( t->pendingRequestCount == NULL )
564        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
565    t->pendingRequestCount[piece]++;
566}
567
568static void
569decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
570{
571    assertValidPiece( t, piece );
572
573    if( t->pendingRequestCount )
574        t->pendingRequestCount[piece]--;
575}
576
577struct tr_refill_piece
578{
579    tr_priority_t    priority;
580    uint32_t         piece;
581    uint32_t         peerCount;
582    int              random;
583    int              pendingRequestCount;
584    int              missingBlockCount;
585};
586
587static int
588compareRefillPiece( const void * aIn, const void * bIn )
589{
590    const struct tr_refill_piece * a = aIn;
591    const struct tr_refill_piece * b = bIn;
592
593    /* if one piece has a higher priority, it goes first */
594    if( a->priority != b->priority )
595        return a->priority > b->priority ? -1 : 1;
596
597    /* have a per-priority endgame */
598    if( a->pendingRequestCount != b->pendingRequestCount )
599        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
600
601    /* fewer missing pieces goes first */
602    if( a->missingBlockCount != b->missingBlockCount )
603        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
604
605    /* otherwise if one has fewer peers, it goes first */
606    if( a->peerCount != b->peerCount )
607        return a->peerCount < b->peerCount ? -1 : 1;
608
609    /* otherwise go with our random seed */
610    if( a->random != b->random )
611        return a->random < b->random ? -1 : 1;
612
613    return 0;
614}
615
616static tr_piece_index_t *
617getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
618{
619    const tr_torrent  * tor = t->tor;
620    const tr_info     * inf = &tor->info;
621    tr_piece_index_t    i;
622    tr_piece_index_t    poolSize = 0;
623    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
624    int                 peerCount;
625    const tr_peer    ** peers;
626
627    assert( torrentIsLocked( t ) );
628
629    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
630    peerCount = tr_ptrArraySize( &t->peers );
631
632    /* make a list of the pieces that we want but don't have */
633    for( i = 0; i < inf->pieceCount; ++i )
634        if( !tor->info.pieces[i].dnd
635                && !tr_cpPieceIsComplete( &tor->completion, i ) )
636            pool[poolSize++] = i;
637
638    /* sort the pool by which to request next */
639    if( poolSize > 1 )
640    {
641        tr_piece_index_t j;
642        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
643
644        for( j = 0; j < poolSize; ++j )
645        {
646            int k;
647            const tr_piece_index_t piece = pool[j];
648            struct tr_refill_piece * setme = p + j;
649
650            setme->piece = piece;
651            setme->priority = inf->pieces[piece].priority;
652            setme->peerCount = 0;
653            setme->random = tr_cryptoWeakRandInt( INT_MAX );
654            setme->pendingRequestCount = getPieceRequests( t, piece );
655            setme->missingBlockCount
656                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
657
658            for( k = 0; k < peerCount; ++k )
659            {
660                const tr_peer * peer = peers[k];
661                if( peer->peerIsInterested
662                        && !peer->clientIsChoked
663                        && tr_bitfieldHas( peer->have, piece ) )
664                    ++setme->peerCount;
665            }
666        }
667
668        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
669               compareRefillPiece );
670
671        for( j = 0; j < poolSize; ++j )
672            pool[j] = p[j].piece;
673
674        tr_free( p );
675    }
676
677    *pieceCount = poolSize;
678    return pool;
679}
680
681static struct tr_blockIterator*
682blockIteratorNew( Torrent * t )
683{
684    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
685    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
686    i->t = t;
687    i->pieces = getPreferredPieces( t, &i->pieceCount );
688    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
689    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
690    return i;
691}
692
693static tr_bool
694blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
695{
696    tr_bool found;
697    Torrent * t = i->t;
698    tr_torrent * tor = t->tor;
699
700    while( ( i->blockIndex == i->blockCount )
701        && ( i->pieceIndex < i->pieceCount ) )
702    {
703        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
704        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
705        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
706        tr_block_index_t block;
707
708        assert( index < tor->info.pieceCount );
709
710        i->blockCount = 0;
711        i->blockIndex = 0;
712        for( block=b; block!=e; ++block )
713            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
714                i->blocks[i->blockCount++] = block;
715    }
716
717    assert( i->blockCount <= tor->blockCountInPiece );
718
719    if(( found = ( i->blockIndex < i->blockCount )))
720        *setme = i->blocks[i->blockIndex++];
721
722    return found;
723}
724
725static void
726blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
727{
728    i->blockIndex = i->blockCount;
729}
730
731static void
732blockIteratorFree( struct tr_blockIterator ** inout )
733{
734    struct tr_blockIterator * it = *inout;
735
736    if( it != NULL )
737    {
738        tr_free( it->blocks );
739        tr_free( it->pieces );
740        tr_free( it );
741    }
742
743    *inout = NULL;
744}
745
746static tr_peer**
747getPeersUploadingToClient( Torrent * t,
748                           int *     setmeCount )
749{
750    int j;
751    int peerCount = 0;
752    int retCount = 0;
753    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
754    tr_peer ** ret = tr_new( tr_peer *, peerCount );
755
756    j = 0; /* this is a temporary test to make sure we walk through all the peers */
757    if( peerCount )
758    {
759        /* Get a list of peers we're downloading from.
760           Pick a different starting point each time so all peers
761           get a chance at being the first in line */
762        const int fencepost = tr_cryptoWeakRandInt( peerCount );
763        int i = fencepost;
764        do {
765            if( clientIsDownloadingFrom( peers[i] ) )
766                ret[retCount++] = peers[i];
767            i = ( i + 1 ) % peerCount;
768            ++j;
769        } while( i != fencepost );
770    }
771    assert( j == peerCount );
772    *setmeCount = retCount;
773    return ret;
774}
775
776static uint32_t
777getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
778{
779    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
780    const uint64_t blockPos = tor->blockSize * b;
781    assert( blockPos >= piecePos );
782    return (uint32_t)( blockPos - piecePos );
783}
784
785static int
786refillUpkeep( void * vmgr )
787{
788    tr_torrent * tor = NULL;
789    tr_peerMgr * mgr = vmgr;
790    time_t now;
791    managerLock( mgr );
792
793    now = time( NULL );
794    while(( tor = tr_torrentNext( mgr->session, tor ))) {
795        Torrent * t = tor->torrentPeers;
796        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
797            tordbg( t, "refill queue is past its shelf date; discarding." );
798            blockIteratorFree( &t->refillQueue );
799        }
800    }
801
802    managerUnlock( mgr );
803    return TRUE;
804}
805
806static void
807sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
808
809static void
810refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
811{
812    tr_block_index_t block;
813    int peerCount;
814    int webseedCount;
815    tr_peer ** peers;
816    tr_webseed ** webseeds;
817    Torrent * t = vtorrent;
818    tr_torrent * tor = t->tor;
819    tr_bool hasNext = TRUE;
820
821    if( !t->isRunning )
822        return;
823    if( tr_torrentIsSeed( t->tor ) )
824        return;
825
826    torrentLock( t );
827    tordbg( t, "Refilling Request Buffers..." );
828
829    if( t->refillQueue == NULL )
830        t->refillQueue = blockIteratorNew( t );
831
832    peers = getPeersUploadingToClient( t, &peerCount );
833    sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );
834    webseedCount = tr_ptrArraySize( &t->webseeds );
835    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
836                          webseedCount * sizeof( tr_webseed* ) );
837
838    while( ( webseedCount || peerCount )
839        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
840    {
841        int j;
842        tr_bool handled = FALSE;
843
844        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
845        const uint32_t offset = getBlockOffsetInPiece( tor, block );
846        const uint32_t length = tr_torBlockCountBytes( tor, block );
847
848        assert( block < tor->blockCount );
849
850        /* find a peer who can ask for this block */
851        for( j=0; !handled && j<peerCount; )
852        {
853            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
854            switch( val )
855            {
856                case TR_ADDREQ_FULL:
857                case TR_ADDREQ_CLIENT_CHOKED:
858                    peers[j] = peers[--peerCount];
859                    break;
860
861                case TR_ADDREQ_MISSING:
862                case TR_ADDREQ_DUPLICATE:
863                    ++j;
864                    break;
865
866                case TR_ADDREQ_OK:
867                    incrementPieceRequests( t, index );
868                    handled = TRUE;
869                    break;
870
871                default:
872                    assert( 0 && "unhandled value" );
873                    break;
874            }
875        }
876
877        /* maybe one of the webseeds can do it */
878        for( j=0; !handled && j<webseedCount; )
879        {
880            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                    webseeds[j] = webseeds[--webseedCount];
885                    break;
886
887                case TR_ADDREQ_OK:
888                    incrementPieceRequests( t, index );
889                    handled = TRUE;
890                    break;
891
892                default:
893                    assert( 0 && "unhandled value" );
894                    break;
895            }
896        }
897
898        if( !handled )
899            blockIteratorSkipCurrentPiece( t->refillQueue );
900    }
901
902    /* cleanup */
903    tr_free( webseeds );
904    tr_free( peers );
905
906    if( !hasNext ) {
907        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
908        blockIteratorFree( &t->refillQueue );
909    }
910
911    torrentUnlock( t );
912}
913
914static void
915broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
916{
917    size_t i;
918    size_t peerCount;
919    tr_peer ** peers;
920
921    assert( torrentIsLocked( t ) );
922
923    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
924
925    peerCount = tr_ptrArraySize( &t->peers );
926    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
927    for( i=0; i<peerCount; ++i )
928        if( peers[i]->msgs )
929            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
930}
931
932static void
933addStrike( Torrent * t, tr_peer * peer )
934{
935    tordbg( t, "increasing peer %s strike count to %d",
936            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
937
938    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
939    {
940        struct peer_atom * atom = peer->atom;
941        atom->myflags |= MYFLAG_BANNED;
942        peer->doPurge = 1;
943        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
944    }
945}
946
947static void
948gotBadPiece( Torrent *        t,
949             tr_piece_index_t pieceIndex )
950{
951    tr_torrent *   tor = t->tor;
952    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
953
954    tor->corruptCur += byteCount;
955    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
956}
957
958static void
959refillSoon( Torrent * t )
960{
961    if( !evtimer_pending( &t->refillTimer, NULL ) )
962        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
963}
964
965static void
966peerSuggestedPiece( Torrent            * t UNUSED,
967                    tr_peer            * peer UNUSED,
968                    tr_piece_index_t     pieceIndex UNUSED,
969                    int                  isFastAllowed UNUSED )
970{
971#if 0
972    assert( t );
973    assert( peer );
974    assert( peer->msgs );
975
976    /* is this a valid piece? */
977    if(  pieceIndex >= t->tor->info.pieceCount )
978        return;
979
980    /* don't ask for it if we've already got it */
981    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
982        return;
983
984    /* don't ask for it if they don't have it */
985    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
986        return;
987
988    /* don't ask for it if we're choked and it's not fast */
989    if( !isFastAllowed && peer->clientIsChoked )
990        return;
991
992    /* request the blocks that we don't have in this piece */
993    {
994        tr_block_index_t block;
995        const tr_torrent * tor = t->tor;
996        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
997        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
998
999        for( block=start; block<end; ++block )
1000        {
1001            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1002            {
1003                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1004                const uint32_t length = tr_torBlockCountBytes( tor, block );
1005                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1006                incrementPieceRequests( t, pieceIndex );
1007            }
1008        }
1009    }
1010#endif
1011}
1012
1013static void
1014peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1015{
1016    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1017    Torrent * t = vt;
1018    const tr_peer_event * e = vevent;
1019
1020    torrentLock( t );
1021
1022    switch( e->eventType )
1023    {
1024        case TR_PEER_UPLOAD_ONLY:
1025            /* update our atom */
1026            if( peer ) {
1027                if( e->uploadOnly ) {
1028                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1029                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1030                } else {
1031                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1032                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1033                }
1034            }
1035            break;
1036
1037        case TR_PEER_NEED_REQ:
1038            refillSoon( t );
1039            break;
1040
1041        case TR_PEER_CANCEL:
1042            decrementPieceRequests( t, e->pieceIndex );
1043            break;
1044
1045        case TR_PEER_PEER_GOT_DATA:
1046        {
1047            const time_t now = time( NULL );
1048            tr_torrent * tor = t->tor;
1049
1050            tr_torrentSetActivityDate( tor, now );
1051
1052            if( e->wasPieceData ) {
1053                tor->uploadedCur += e->length;
1054                tr_torrentSetDirty( tor );
1055            }
1056
1057            /* update the stats */
1058            if( e->wasPieceData )
1059                tr_statsAddUploaded( tor->session, e->length );
1060
1061            /* update our atom */
1062            if( peer && e->wasPieceData )
1063                peer->atom->piece_data_time = now;
1064
1065            tor->needsSeedRatioCheck = TRUE;
1066
1067            break;
1068        }
1069
1070        case TR_PEER_CLIENT_GOT_PORT:
1071            if( peer )
1072                peer->atom->port = e->port;
1073            break;
1074
1075        case TR_PEER_CLIENT_GOT_SUGGEST:
1076            if( peer )
1077                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1078            break;
1079
1080        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1081            if( peer )
1082                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1083            break;
1084
1085        case TR_PEER_CLIENT_GOT_DATA:
1086        {
1087            const time_t now = time( NULL );
1088            tr_torrent * tor = t->tor;
1089
1090            tr_torrentSetActivityDate( tor, now );
1091
1092            /* only add this to downloadedCur if we got it from a peer --
1093             * webseeds shouldn't count against our ratio.  As one tracker
1094             * admin put it, "Those pieces are downloaded directly from the
1095             * content distributor, not the peers, it is the tracker's job
1096             * to manage the swarms, not the web server and does not fit
1097             * into the jurisdiction of the tracker." */
1098            if( peer && e->wasPieceData ) {
1099                tor->downloadedCur += e->length;
1100                tr_torrentSetDirty( tor );
1101            }
1102
1103            /* update the stats */
1104            if( e->wasPieceData )
1105                tr_statsAddDownloaded( tor->session, e->length );
1106
1107            /* update our atom */
1108            if( peer && e->wasPieceData )
1109                peer->atom->piece_data_time = now;
1110
1111            break;
1112        }
1113
1114        case TR_PEER_PEER_PROGRESS:
1115        {
1116            if( peer )
1117            {
1118                struct peer_atom * atom = peer->atom;
1119                if( e->progress >= 1.0 ) {
1120                    tordbg( t, "marking peer %s as a seed",
1121                            tr_atomAddrStr( atom ) );
1122                    atom->flags |= ADDED_F_SEED_FLAG;
1123                }
1124            }
1125            break;
1126        }
1127
1128        case TR_PEER_CLIENT_GOT_BLOCK:
1129        {
1130            tr_torrent * tor = t->tor;
1131
1132            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1133
1134            tr_cpBlockAdd( &tor->completion, block );
1135            tr_torrentSetDirty( tor );
1136            decrementPieceRequests( t, e->pieceIndex );
1137
1138            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1139
1140            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1141            {
1142                const tr_piece_index_t p = e->pieceIndex;
1143                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1144
1145                if( !ok )
1146                {
1147                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1148                               (unsigned long)p );
1149                }
1150
1151                tr_torrentSetHasPiece( tor, p, ok );
1152                tr_torrentSetPieceChecked( tor, p, TRUE );
1153                tr_peerMgrSetBlame( tor, p, ok );
1154
1155                if( !ok )
1156                {
1157                    gotBadPiece( t, p );
1158                }
1159                else
1160                {
1161                    int i;
1162                    int peerCount;
1163                    tr_peer ** peers;
1164                    tr_file_index_t fileIndex;
1165
1166                    peerCount = tr_ptrArraySize( &t->peers );
1167                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1168                    for( i=0; i<peerCount; ++i )
1169                        tr_peerMsgsHave( peers[i]->msgs, p );
1170
1171                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1172                        const tr_file * file = &tor->info.files[fileIndex];
1173                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1174                            if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1175                                tr_torrentFileCompleted( tor, fileIndex );
1176                    }
1177                }
1178            }
1179            break;
1180        }
1181
1182        case TR_PEER_ERROR:
1183            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1184            {
1185                /* some protocol error from the peer */
1186                peer->doPurge = 1;
1187                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1188                        tr_atomAddrStr( peer->atom ) );
1189            }
1190            else 
1191            {
1192                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1193            }
1194            break;
1195
1196        default:
1197            assert( 0 );
1198    }
1199
1200    torrentUnlock( t );
1201}
1202
1203static void
1204ensureAtomExists( Torrent          * t,
1205                  const tr_address * addr,
1206                  tr_port            port,
1207                  uint8_t            flags,
1208                  uint8_t            from )
1209{
1210    assert( tr_isAddress( addr ) );
1211    assert( from < TR_PEER_FROM__MAX );
1212
1213    if( getExistingAtom( t, addr ) == NULL )
1214    {
1215        struct peer_atom * a;
1216        a = tr_new0( struct peer_atom, 1 );
1217        a->addr = *addr;
1218        a->port = port;
1219        a->flags = flags;
1220        a->from = from;
1221        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1222        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1223    }
1224}
1225
1226static int
1227getMaxPeerCount( const tr_torrent * tor )
1228{
1229    return tor->maxConnectedPeers;
1230}
1231
1232static int
1233getPeerCount( const Torrent * t )
1234{
1235    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1236}
1237
1238/* FIXME: this is kind of a mess. */
1239static tr_bool
1240myHandshakeDoneCB( tr_handshake  * handshake,
1241                   tr_peerIo     * io,
1242                   tr_bool         isConnected,
1243                   const uint8_t * peer_id,
1244                   void          * vmanager )
1245{
1246    tr_bool            ok = isConnected;
1247    tr_bool            success = FALSE;
1248    tr_port            port;
1249    const tr_address * addr;
1250    tr_peerMgr       * manager = vmanager;
1251    Torrent          * t;
1252    tr_handshake     * ours;
1253
1254    assert( io );
1255    assert( tr_isBool( ok ) );
1256
1257    t = tr_peerIoHasTorrentHash( io )
1258        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1259        : NULL;
1260
1261    if( tr_peerIoIsIncoming ( io ) )
1262        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1263                                        handshake, handshakeCompare );
1264    else if( t )
1265        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1266                                        handshake, handshakeCompare );
1267    else
1268        ours = handshake;
1269
1270    assert( ours );
1271    assert( ours == handshake );
1272
1273    if( t )
1274        torrentLock( t );
1275
1276    addr = tr_peerIoGetAddress( io, &port );
1277
1278    if( !ok || !t || !t->isRunning )
1279    {
1280        if( t )
1281        {
1282            struct peer_atom * atom = getExistingAtom( t, addr );
1283            if( atom )
1284                ++atom->numFails;
1285        }
1286    }
1287    else /* looking good */
1288    {
1289        struct peer_atom * atom;
1290        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1291        atom = getExistingAtom( t, addr );
1292        atom->time = time( NULL );
1293        atom->piece_data_time = 0;
1294
1295        if( atom->myflags & MYFLAG_BANNED )
1296        {
1297            tordbg( t, "banned peer %s tried to reconnect",
1298                    tr_atomAddrStr( atom ) );
1299        }
1300        else if( tr_peerIoIsIncoming( io )
1301               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1302
1303        {
1304        }
1305        else
1306        {
1307            tr_peer * peer = getExistingPeer( t, addr );
1308
1309            if( peer ) /* we already have this peer */
1310            {
1311            }
1312            else
1313            {
1314                peer = getPeer( t, atom );
1315                tr_free( peer->client );
1316
1317                if( !peer_id )
1318                    peer->client = NULL;
1319                else {
1320                    char client[128];
1321                    tr_clientForId( client, sizeof( client ), peer_id );
1322                    peer->client = tr_strdup( client );
1323                }
1324
1325                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1326                                                                balanced by our unref in peerDestructor()  */
1327                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1328                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1329
1330                success = TRUE;
1331            }
1332        }
1333    }
1334
1335    if( t )
1336        torrentUnlock( t );
1337
1338    return success;
1339}
1340
1341void
1342tr_peerMgrAddIncoming( tr_peerMgr * manager,
1343                       tr_address * addr,
1344                       tr_port      port,
1345                       int          socket )
1346{
1347    tr_session * session;
1348
1349    managerLock( manager );
1350
1351    assert( tr_isSession( manager->session ) );
1352    session = manager->session;
1353
1354    if( tr_sessionIsAddressBlocked( session, addr ) )
1355    {
1356        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1357        tr_netClose( session, socket );
1358    }
1359    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1360    {
1361        tr_netClose( session, socket );
1362    }
1363    else /* we don't have a connetion to them yet... */
1364    {
1365        tr_peerIo *    io;
1366        tr_handshake * handshake;
1367
1368        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1369
1370        handshake = tr_handshakeNew( io,
1371                                     session->encryptionMode,
1372                                     myHandshakeDoneCB,
1373                                     manager );
1374
1375        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1376
1377        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1378                                 handshakeCompare );
1379    }
1380
1381    managerUnlock( manager );
1382}
1383
1384static tr_bool
1385tr_isPex( const tr_pex * pex )
1386{
1387    return pex && tr_isAddress( &pex->addr );
1388}
1389
1390void
1391tr_peerMgrAddPex( tr_torrent   *  tor,
1392                  uint8_t         from,
1393                  const tr_pex *  pex )
1394{
1395    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1396    {
1397        Torrent * t = tor->torrentPeers;
1398        managerLock( t->manager );
1399
1400        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1401            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1402                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1403
1404        managerUnlock( t->manager );
1405    }
1406}
1407
1408tr_pex *
1409tr_peerMgrCompactToPex( const void *    compact,
1410                        size_t          compactLen,
1411                        const uint8_t * added_f,
1412                        size_t          added_f_len,
1413                        size_t *        pexCount )
1414{
1415    size_t          i;
1416    size_t          n = compactLen / 6;
1417    const uint8_t * walk = compact;
1418    tr_pex *        pex = tr_new0( tr_pex, n );
1419
1420    for( i = 0; i < n; ++i )
1421    {
1422        pex[i].addr.type = TR_AF_INET;
1423        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1424        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1425        if( added_f && ( n == added_f_len ) )
1426            pex[i].flags = added_f[i];
1427    }
1428
1429    *pexCount = n;
1430    return pex;
1431}
1432
1433tr_pex *
1434tr_peerMgrCompact6ToPex( const void    * compact,
1435                         size_t          compactLen,
1436                         const uint8_t * added_f,
1437                         size_t          added_f_len,
1438                         size_t        * pexCount )
1439{
1440    size_t          i;
1441    size_t          n = compactLen / 18;
1442    const uint8_t * walk = compact;
1443    tr_pex *        pex = tr_new0( tr_pex, n );
1444
1445    for( i = 0; i < n; ++i )
1446    {
1447        pex[i].addr.type = TR_AF_INET6;
1448        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1449        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1450        if( added_f && ( n == added_f_len ) )
1451            pex[i].flags = added_f[i];
1452    }
1453
1454    *pexCount = n;
1455    return pex;
1456}
1457
1458tr_pex *
1459tr_peerMgrArrayToPex( const void * array,
1460                      size_t       arrayLen,
1461                      size_t      * pexCount )
1462{
1463    size_t          i;
1464    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1465    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1466    const uint8_t * walk = array;
1467    tr_pex        * pex = tr_new0( tr_pex, n );
1468
1469    for( i = 0 ; i < n ; i++ ) {
1470        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1471        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1472        pex[i].flags = 0x00;
1473        walk += sizeof( tr_address ) + 2;
1474    }
1475
1476    *pexCount = n;
1477    return pex;
1478}
1479
1480/**
1481***
1482**/
1483
1484void
1485tr_peerMgrSetBlame( tr_torrent     * tor,
1486                    tr_piece_index_t pieceIndex,
1487                    int              success )
1488{
1489    if( !success )
1490    {
1491        int        peerCount, i;
1492        Torrent *  t = tor->torrentPeers;
1493        tr_peer ** peers;
1494
1495        assert( torrentIsLocked( t ) );
1496
1497        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1498        for( i = 0; i < peerCount; ++i )
1499        {
1500            tr_peer * peer = peers[i];
1501            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1502            {
1503                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1504                        tr_atomAddrStr( peer->atom ),
1505                        pieceIndex, (int)peer->strikes + 1 );
1506                addStrike( t, peer );
1507            }
1508        }
1509    }
1510}
1511
1512int
1513tr_pexCompare( const void * va, const void * vb )
1514{
1515    const tr_pex * a = va;
1516    const tr_pex * b = vb;
1517    int i;
1518
1519    assert( tr_isPex( a ) );
1520    assert( tr_isPex( b ) );
1521
1522    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1523        return i;
1524
1525    if( a->port != b->port )
1526        return a->port < b->port ? -1 : 1;
1527
1528    return 0;
1529}
1530
1531#if 0
1532static int
1533peerPrefersCrypto( const tr_peer * peer )
1534{
1535    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1536        return TRUE;
1537
1538    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1539        return FALSE;
1540
1541    return tr_peerIoIsEncrypted( peer->io );
1542}
1543#endif
1544
1545/* better goes first */
1546static int
1547compareAtomsByUsefulness( const void * va, const void *vb )
1548{
1549    const struct peer_atom * a = * (const struct peer_atom**) va;
1550    const struct peer_atom * b = * (const struct peer_atom**) vb;
1551
1552    assert( tr_isAtom( a ) );
1553    assert( tr_isAtom( b ) );
1554
1555    if( a->piece_data_time != b->piece_data_time )
1556        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1557    if( a->from != b->from )
1558        return a->from < b->from ? -1 : 1;
1559    if( a->numFails != b->numFails )
1560        return a->numFails < b->numFails ? -1 : 1;
1561
1562    return 0;
1563}
1564
1565int
1566tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af, int maxPeerCount )
1567{
1568    int count = 0;
1569    const Torrent * t = tor->torrentPeers;
1570
1571    managerLock( t->manager );
1572
1573    {
1574        int i;
1575        const int atomCount = tr_ptrArraySize( &t->pool );
1576        const int pexCount = MIN( atomCount, maxPeerCount );
1577        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1578        struct peer_atom ** atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1579        /* for now, this will waste memory on torrents that have both
1580         * ipv6 and ipv4 peers */
1581        tr_pex * pex = tr_new0( tr_pex, atomCount );
1582        tr_pex * walk = pex;
1583
1584        qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1585
1586        for( i=0; i<atomCount && count<pexCount; ++i )
1587        {
1588            const struct peer_atom * atom = atoms[i];
1589            if( atom->addr.type == af )
1590            {
1591                assert( tr_isAddress( &atom->addr ) );
1592                walk->addr = atom->addr;
1593                walk->port = atom->port;
1594                walk->flags = atom->flags;
1595                ++count;
1596                ++walk;
1597            }
1598        }
1599
1600        assert( ( walk - pex ) == count );
1601        *setme_pex = pex;
1602
1603        tr_free( atoms );
1604    }
1605
1606    managerUnlock( t->manager );
1607    return count;
1608}
1609
1610static void
1611ensureMgrTimersExist( struct tr_peerMgr * m )
1612{
1613    tr_session * s = m->session;
1614
1615    if( m->bandwidthTimer == NULL )
1616        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1617
1618    if( m->rechokeTimer == NULL )
1619        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1620
1621    if( m->reconnectTimer == NULL )
1622        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1623
1624    if( m->refillUpkeepTimer == NULL )
1625        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1626}
1627
1628void
1629tr_peerMgrStartTorrent( tr_torrent * tor )
1630{
1631    Torrent * t = tor->torrentPeers;
1632
1633    assert( t != NULL );
1634    managerLock( t->manager );
1635    ensureMgrTimersExist( t->manager );
1636
1637    if( !t->isRunning )
1638    {
1639        t->isRunning = TRUE;
1640
1641        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1642            refillSoon( t );
1643    }
1644
1645    rechokePulse( t->manager );
1646    managerUnlock( t->manager );
1647}
1648
1649static void
1650stopTorrent( Torrent * t )
1651{
1652    assert( torrentIsLocked( t ) );
1653
1654    t->isRunning = FALSE;
1655
1656    /* disconnect the peers. */
1657    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1658    tr_ptrArrayClear( &t->peers );
1659
1660    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1661     * which removes the handshake from t->outgoingHandshakes... */
1662    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1663        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1664}
1665
1666void
1667tr_peerMgrStopTorrent( tr_torrent * tor )
1668{
1669    Torrent * t = tor->torrentPeers;
1670
1671    managerLock( t->manager );
1672
1673    stopTorrent( t );
1674
1675    managerUnlock( t->manager );
1676}
1677
1678void
1679tr_peerMgrAddTorrent( tr_peerMgr * manager,
1680                      tr_torrent * tor )
1681{
1682    managerLock( manager );
1683
1684    assert( tor );
1685    assert( tor->torrentPeers == NULL );
1686
1687    tor->torrentPeers = torrentConstructor( manager, tor );
1688
1689    managerUnlock( manager );
1690}
1691
1692void
1693tr_peerMgrRemoveTorrent( tr_torrent * tor )
1694{
1695    tr_torrentLock( tor );
1696
1697    stopTorrent( tor->torrentPeers );
1698    torrentDestructor( tor->torrentPeers );
1699
1700    tr_torrentUnlock( tor );
1701}
1702
1703void
1704tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1705                               int8_t           * tab,
1706                               unsigned int       tabCount )
1707{
1708    tr_piece_index_t   i;
1709    const Torrent *    t;
1710    float              interval;
1711    tr_bool            isSeed;
1712    int                peerCount;
1713    const tr_peer **   peers;
1714    tr_torrentLock( tor );
1715
1716    t = tor->torrentPeers;
1717    tor = t->tor;
1718    interval = tor->info.pieceCount / (float)tabCount;
1719    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1720    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1721    peerCount = tr_ptrArraySize( &t->peers );
1722
1723    memset( tab, 0, tabCount );
1724
1725    for( i = 0; tor && i < tabCount; ++i )
1726    {
1727        const int piece = i * interval;
1728
1729        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1730            tab[i] = -1;
1731        else if( peerCount ) {
1732            int j;
1733            for( j = 0; j < peerCount; ++j )
1734                if( tr_bitfieldHas( peers[j]->have, i ) )
1735                    ++tab[i];
1736        }
1737    }
1738
1739    tr_torrentUnlock( tor );
1740}
1741
1742/* Returns the pieces that are available from peers */
1743tr_bitfield*
1744tr_peerMgrGetAvailable( const tr_torrent * tor )
1745{
1746    int i;
1747    int peerCount;
1748    Torrent * t = tor->torrentPeers;
1749    const tr_peer ** peers;
1750    tr_bitfield * pieces;
1751    managerLock( t->manager );
1752
1753    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1754    peerCount = tr_ptrArraySize( &t->peers );
1755    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1756    for( i=0; i<peerCount; ++i )
1757        tr_bitfieldOr( pieces, peers[i]->have );
1758
1759    managerUnlock( t->manager );
1760    return pieces;
1761}
1762
1763void
1764tr_peerMgrTorrentStats( tr_torrent       * tor,
1765                        int              * setmePeersKnown,
1766                        int              * setmePeersConnected,
1767                        int              * setmeSeedsConnected,
1768                        int              * setmeWebseedsSendingToUs,
1769                        int              * setmePeersSendingToUs,
1770                        int              * setmePeersGettingFromUs,
1771                        int              * setmePeersFrom )
1772{
1773    int i, size;
1774    const Torrent * t = tor->torrentPeers;
1775    const tr_peer ** peers;
1776    const tr_webseed ** webseeds;
1777
1778    managerLock( t->manager );
1779
1780    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1781    size = tr_ptrArraySize( &t->peers );
1782
1783    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1784    *setmePeersConnected       = 0;
1785    *setmeSeedsConnected       = 0;
1786    *setmePeersGettingFromUs   = 0;
1787    *setmePeersSendingToUs     = 0;
1788    *setmeWebseedsSendingToUs  = 0;
1789
1790    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1791        setmePeersFrom[i] = 0;
1792
1793    for( i=0; i<size; ++i )
1794    {
1795        const tr_peer * peer = peers[i];
1796        const struct peer_atom * atom = peer->atom;
1797
1798        if( peer->io == NULL ) /* not connected */
1799            continue;
1800
1801        ++*setmePeersConnected;
1802
1803        ++setmePeersFrom[atom->from];
1804
1805        if( clientIsDownloadingFrom( peer ) )
1806            ++*setmePeersSendingToUs;
1807
1808        if( clientIsUploadingTo( peer ) )
1809            ++*setmePeersGettingFromUs;
1810
1811        if( atom->flags & ADDED_F_SEED_FLAG )
1812            ++*setmeSeedsConnected;
1813    }
1814
1815    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1816    size = tr_ptrArraySize( &t->webseeds );
1817    for( i=0; i<size; ++i )
1818        if( tr_webseedIsActive( webseeds[i] ) )
1819            ++*setmeWebseedsSendingToUs;
1820
1821    managerUnlock( t->manager );
1822}
1823
1824float
1825tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1826{
1827    int i;
1828    float tmp;
1829    float ret = 0;
1830
1831    const Torrent * t = tor->torrentPeers;
1832    const int n = tr_ptrArraySize( &t->webseeds );
1833    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1834
1835    for( i=0; i<n; ++i )
1836        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1837            ret += tmp;
1838
1839    return ret;
1840}
1841
1842
1843float*
1844tr_peerMgrWebSpeeds( const tr_torrent * tor )
1845{
1846    const Torrent * t = tor->torrentPeers;
1847    const tr_webseed ** webseeds;
1848    int i;
1849    int webseedCount;
1850    float * ret;
1851    uint64_t now;
1852
1853    assert( t->manager );
1854    managerLock( t->manager );
1855
1856    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1857    webseedCount = tr_ptrArraySize( &t->webseeds );
1858    assert( webseedCount == tor->info.webseedCount );
1859    ret = tr_new0( float, webseedCount );
1860    now = tr_date( );
1861
1862    for( i=0; i<webseedCount; ++i )
1863        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1864            ret[i] = -1.0;
1865
1866    managerUnlock( t->manager );
1867    return ret;
1868}
1869
1870double
1871tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1872{
1873    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1874}
1875
1876
1877struct tr_peer_stat *
1878tr_peerMgrPeerStats( const tr_torrent    * tor,
1879                     int                 * setmeCount )
1880{
1881    int i, size;
1882    const Torrent * t = tor->torrentPeers;
1883    const tr_peer ** peers;
1884    tr_peer_stat * ret;
1885    uint64_t now;
1886
1887    assert( t->manager );
1888    managerLock( t->manager );
1889
1890    size = tr_ptrArraySize( &t->peers );
1891    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1892    ret = tr_new0( tr_peer_stat, size );
1893    now = tr_date( );
1894
1895    for( i=0; i<size; ++i )
1896    {
1897        char *                   pch;
1898        const tr_peer *          peer = peers[i];
1899        const struct peer_atom * atom = peer->atom;
1900        tr_peer_stat *           stat = ret + i;
1901
1902        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
1903        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1904                   sizeof( stat->client ) );
1905        stat->port               = ntohs( peer->atom->port );
1906        stat->from               = atom->from;
1907        stat->progress           = peer->progress;
1908        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1909        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1910        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1911        stat->peerIsChoked       = peer->peerIsChoked;
1912        stat->peerIsInterested   = peer->peerIsInterested;
1913        stat->clientIsChoked     = peer->clientIsChoked;
1914        stat->clientIsInterested = peer->clientIsInterested;
1915        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1916        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1917        stat->isUploadingTo      = clientIsUploadingTo( peer );
1918        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1919
1920        pch = stat->flagStr;
1921        if( t->optimistic == peer ) *pch++ = 'O';
1922        if( stat->isDownloadingFrom ) *pch++ = 'D';
1923        else if( stat->clientIsInterested ) *pch++ = 'd';
1924        if( stat->isUploadingTo ) *pch++ = 'U';
1925        else if( stat->peerIsInterested ) *pch++ = 'u';
1926        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1927        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1928        if( stat->isEncrypted ) *pch++ = 'E';
1929        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1930        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1931        if( stat->isIncoming ) *pch++ = 'I';
1932        *pch = '\0';
1933    }
1934
1935    *setmeCount = size;
1936
1937    managerUnlock( t->manager );
1938    return ret;
1939}
1940
1941/**
1942***
1943**/
1944
1945struct ChokeData
1946{
1947    tr_bool         doUnchoke;
1948    tr_bool         isInterested;
1949    tr_bool         isChoked;
1950    int             rate;
1951    tr_peer *       peer;
1952};
1953
1954static int
1955compareChoke( const void * va,
1956              const void * vb )
1957{
1958    const struct ChokeData * a = va;
1959    const struct ChokeData * b = vb;
1960
1961    if( a->rate != b->rate ) /* prefer higher overall speeds */
1962        return a->rate > b->rate ? -1 : 1;
1963
1964    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1965        return a->isChoked ? 1 : -1;
1966
1967    return 0;
1968}
1969
1970static int
1971isNew( const tr_peer * peer )
1972{
1973    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1974}
1975
1976static int
1977isSame( const tr_peer * peer )
1978{
1979    return peer && peer->client && strstr( peer->client, "Transmission" );
1980}
1981
1982/**
1983***
1984**/
1985
1986static void
1987rechokeTorrent( Torrent * t, const uint64_t now )
1988{
1989    int i, size, unchokedInterested;
1990    const int peerCount = tr_ptrArraySize( &t->peers );
1991    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1992    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1993    const tr_session * session = t->manager->session;
1994    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1995
1996    assert( torrentIsLocked( t ) );
1997
1998    /* sort the peers by preference and rate */
1999    for( i = 0, size = 0; i < peerCount; ++i )
2000    {
2001        tr_peer * peer = peers[i];
2002        struct peer_atom * atom = peer->atom;
2003
2004        if( peer->progress >= 1.0 ) /* choke all seeds */
2005        {
2006            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2007        }
2008        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2009        {
2010            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2011        }
2012        else if( chokeAll ) /* choke everyone if we're not uploading */
2013        {
2014            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2015        }
2016        else
2017        {
2018            struct ChokeData * n = &choke[size++];
2019            n->peer         = peer;
2020            n->isInterested = peer->peerIsInterested;
2021            n->isChoked     = peer->peerIsChoked;
2022            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2023        }
2024    }
2025
2026    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2027
2028    /**
2029     * Reciprocation and number of uploads capping is managed by unchoking
2030     * the N peers which have the best upload rate and are interested.
2031     * This maximizes the client's download rate. These N peers are
2032     * referred to as downloaders, because they are interested in downloading
2033     * from the client.
2034     *
2035     * Peers which have a better upload rate (as compared to the downloaders)
2036     * but aren't interested get unchoked. If they become interested, the
2037     * downloader with the worst upload rate gets choked. If a client has
2038     * a complete file, it uses its upload rate rather than its download
2039     * rate to decide which peers to unchoke.
2040     */
2041    unchokedInterested = 0;
2042    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2043        choke[i].doUnchoke = 1;
2044        if( choke[i].isInterested )
2045            ++unchokedInterested;
2046    }
2047
2048    /* optimistic unchoke */
2049    if( i < size )
2050    {
2051        int n;
2052        struct ChokeData * c;
2053        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2054
2055        for( ; i<size; ++i )
2056        {
2057            if( choke[i].isInterested )
2058            {
2059                const tr_peer * peer = choke[i].peer;
2060                int x = 1, y;
2061                if( isNew( peer ) ) x *= 3;
2062                if( isSame( peer ) ) x *= 3;
2063                for( y=0; y<x; ++y )
2064                    tr_ptrArrayAppend( &randPool, &choke[i] );
2065            }
2066        }
2067
2068        if(( n = tr_ptrArraySize( &randPool )))
2069        {
2070            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2071            c->doUnchoke = 1;
2072            t->optimistic = c->peer;
2073        }
2074
2075        tr_ptrArrayDestruct( &randPool, NULL );
2076    }
2077
2078    for( i=0; i<size; ++i )
2079        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2080
2081    /* cleanup */
2082    tr_free( choke );
2083}
2084
2085static int
2086rechokePulse( void * vmgr )
2087{
2088    uint64_t now;
2089    tr_torrent * tor = NULL;
2090    tr_peerMgr * mgr = vmgr;
2091    managerLock( mgr );
2092
2093    now = tr_date( );
2094    while(( tor = tr_torrentNext( mgr->session, tor )))
2095        if( tor->isRunning )
2096            rechokeTorrent( tor->torrentPeers, now );
2097
2098    managerUnlock( mgr );
2099    return TRUE;
2100}
2101
2102/***
2103****
2104****  Life and Death
2105****
2106***/
2107
2108typedef enum
2109{
2110    TR_CAN_KEEP,
2111    TR_CAN_CLOSE,
2112    TR_MUST_CLOSE,
2113}
2114tr_close_type_t;
2115
2116static tr_close_type_t
2117shouldPeerBeClosed( const Torrent    * t,
2118                    const tr_peer    * peer,
2119                    int                peerCount )
2120{
2121    const tr_torrent *       tor = t->tor;
2122    const time_t             now = time( NULL );
2123    const struct peer_atom * atom = peer->atom;
2124
2125    /* if it's marked for purging, close it */
2126    if( peer->doPurge )
2127    {
2128        tordbg( t, "purging peer %s because its doPurge flag is set",
2129                tr_atomAddrStr( atom ) );
2130        return TR_MUST_CLOSE;
2131    }
2132
2133    /* if we're seeding and the peer has everything we have,
2134     * and enough time has passed for a pex exchange, then disconnect */
2135    if( tr_torrentIsSeed( tor ) )
2136    {
2137        int peerHasEverything;
2138        if( atom->flags & ADDED_F_SEED_FLAG )
2139            peerHasEverything = TRUE;
2140        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2141            peerHasEverything = FALSE;
2142        else {
2143            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2144            tr_bitfieldDifference( tmp, peer->have );
2145            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2146            tr_bitfieldFree( tmp );
2147        }
2148
2149        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2150        {
2151            tordbg( t, "purging peer %s because we're both seeds",
2152                    tr_atomAddrStr( atom ) );
2153            return TR_MUST_CLOSE;
2154        }
2155    }
2156
2157    /* disconnect if it's been too long since piece data has been transferred.
2158     * this is on a sliding scale based on number of available peers... */
2159    {
2160        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2161        /* if we have >= relaxIfFewerThan, strictness is 100%.
2162         * if we have zero connections, strictness is 0% */
2163        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2164                               ? 1.0
2165                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2166        const int lo = MIN_UPLOAD_IDLE_SECS;
2167        const int hi = MAX_UPLOAD_IDLE_SECS;
2168        const int limit = hi - ( ( hi - lo ) * strictness );
2169        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2170/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2171        if( idleTime > limit ) {
2172            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2173                       tr_atomAddrStr( atom ), idleTime );
2174            return TR_CAN_CLOSE;
2175        }
2176    }
2177
2178    return TR_CAN_KEEP;
2179}
2180
2181static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2182
2183static tr_peer **
2184getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2185{
2186    int i, peerCount, outsize;
2187    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2188    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2189
2190    assert( torrentIsLocked( t ) );
2191
2192    for( i = outsize = 0; i < peerCount; ++i )
2193        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2194            ret[outsize++] = peers[i];
2195
2196    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2197
2198    *setmeSize = outsize;
2199    return ret;
2200}
2201
2202static int
2203compareCandidates( const void * va, const void * vb )
2204{
2205    const struct peer_atom * a = *(const struct peer_atom**) va;
2206    const struct peer_atom * b = *(const struct peer_atom**) vb;
2207
2208    /* <Charles> Here we would probably want to try reconnecting to
2209     * peers that had most recently given us data. Lots of users have
2210     * trouble with resets due to their routers and/or ISPs. This way we
2211     * can quickly recover from an unwanted reset. So we sort
2212     * piece_data_time in descending order.
2213     */
2214
2215    if( a->piece_data_time != b->piece_data_time )
2216        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2217
2218    if( a->numFails != b->numFails )
2219        return a->numFails < b->numFails ? -1 : 1;
2220
2221    if( a->time != b->time )
2222        return a->time < b->time ? -1 : 1;
2223
2224    /* In order to avoid fragmenting the swarm, peers from trackers and
2225     * from the DHT should be preferred to peers from PEX. */
2226    if( a->from != b->from )
2227        return a->from < b->from ? -1 : 1;
2228
2229    return 0;
2230}
2231
2232static int
2233getReconnectIntervalSecs( const struct peer_atom * atom )
2234{
2235    int          sec;
2236    const time_t now = time( NULL );
2237
2238    /* if we were recently connected to this peer and transferring piece
2239     * data, try to reconnect to them sooner rather that later -- we don't
2240     * want network troubles to get in the way of a good peer. */
2241    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2242        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2243
2244    /* don't allow reconnects more often than our minimum */
2245    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2246        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2247
2248    /* otherwise, the interval depends on how many times we've tried
2249     * and failed to connect to the peer */
2250    else switch( atom->numFails ) {
2251        case 0: sec = 0; break;
2252        case 1: sec = 5; break;
2253        case 2: sec = 2 * 60; break;
2254        case 3: sec = 15 * 60; break;
2255        case 4: sec = 30 * 60; break;
2256        case 5: sec = 60 * 60; break;
2257        default: sec = 120 * 60; break;
2258    }
2259
2260    return sec;
2261}
2262
2263static struct peer_atom **
2264getPeerCandidates( Torrent * t, int * setmeSize )
2265{
2266    int                 i, atomCount, retCount;
2267    struct peer_atom ** atoms;
2268    struct peer_atom ** ret;
2269    const time_t        now = time( NULL );
2270    const int           seed = tr_torrentIsSeed( t->tor );
2271
2272    assert( torrentIsLocked( t ) );
2273
2274    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2275    ret = tr_new( struct peer_atom*, atomCount );
2276    for( i = retCount = 0; i < atomCount; ++i )
2277    {
2278        int                interval;
2279        struct peer_atom * atom = atoms[i];
2280
2281        /* peer fed us too much bad data ... we only keep it around
2282         * now to weed it out in case someone sends it to us via pex */
2283        if( atom->myflags & MYFLAG_BANNED )
2284            continue;
2285
2286        /* peer was unconnectable before, so we're not going to keep trying.
2287         * this is needs a separate flag from `banned', since if they try
2288         * to connect to us later, we'll let them in */
2289        if( atom->myflags & MYFLAG_UNREACHABLE )
2290            continue;
2291
2292        /* we don't need two connections to the same peer... */
2293        if( peerIsInUse( t, &atom->addr ) )
2294            continue;
2295
2296        /* no need to connect if we're both seeds... */
2297        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2298                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2299            continue;
2300
2301        /* don't reconnect too often */
2302        interval = getReconnectIntervalSecs( atom );
2303        if( ( now - atom->time ) < interval )
2304        {
2305            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2306                    i, tr_atomAddrStr( atom ), interval );
2307            continue;
2308        }
2309
2310        /* Don't connect to peers in our blocklist */
2311        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2312            continue;
2313
2314        ret[retCount++] = atom;
2315    }
2316
2317    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2318    *setmeSize = retCount;
2319    return ret;
2320}
2321
2322static void
2323closePeer( Torrent * t, tr_peer * peer )
2324{
2325    struct peer_atom * atom;
2326
2327    assert( t != NULL );
2328    assert( peer != NULL );
2329
2330    atom = peer->atom;
2331
2332    /* if we transferred piece data, then they might be good peers,
2333       so reset their `numFails' weight to zero.  otherwise we connected
2334       to them fruitlessly, so mark it as another fail */
2335    if( atom->piece_data_time )
2336        atom->numFails = 0;
2337    else
2338        ++atom->numFails;
2339
2340    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2341    removePeer( t, peer );
2342}
2343
2344static void
2345reconnectTorrent( Torrent * t )
2346{
2347    static time_t prevTime = 0;
2348    static int    newConnectionsThisSecond = 0;
2349    time_t        now;
2350
2351    now = time( NULL );
2352    if( prevTime != now )
2353    {
2354        prevTime = now;
2355        newConnectionsThisSecond = 0;
2356    }
2357
2358    if( !t->isRunning )
2359    {
2360        removeAllPeers( t );
2361    }
2362    else
2363    {
2364        int i;
2365        int canCloseCount;
2366        int mustCloseCount;
2367        int candidateCount;
2368        int maxCandidates;
2369        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2370        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2371        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2372
2373        tordbg( t, "reconnect pulse for [%s]: "
2374                   "%d must-close connections, "
2375                   "%d can-close connections, "
2376                   "%d connection candidates, "
2377                   "%d atoms, "
2378                   "max per pulse is %d",
2379                   tr_torrentName( t->tor ),
2380                   mustCloseCount,
2381                   canCloseCount,
2382                   candidateCount,
2383                   tr_ptrArraySize( &t->pool ),
2384                   MAX_RECONNECTIONS_PER_PULSE );
2385
2386        /* disconnect the really bad peers */
2387        for( i=0; i<mustCloseCount; ++i )
2388            closePeer( t, mustClose[i] );
2389
2390        /* decide how many peers can we try to add in this pass */
2391        maxCandidates = candidateCount;
2392        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2393        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2394        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2395
2396        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2397        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2398            closePeer( t, canClose[i] );
2399
2400        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2401                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2402                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2403                   candidateCount,
2404                   MAX_RECONNECTIONS_PER_PULSE,
2405                   getPeerCount( t ),
2406                   getMaxPeerCount( t->tor ),
2407                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2408
2409        /* add some new ones */
2410        for( i=0; i<maxCandidates; ++i )
2411        {
2412            tr_peerMgr        * mgr = t->manager;
2413            struct peer_atom  * atom = candidates[i];
2414            tr_peerIo         * io;
2415
2416            tordbg( t, "Starting an OUTGOING connection with %s",
2417                   tr_atomAddrStr( atom ) );
2418
2419            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2420
2421            if( io == NULL )
2422            {
2423                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2424                        tr_atomAddrStr( atom ) );
2425                atom->myflags |= MYFLAG_UNREACHABLE;
2426            }
2427            else
2428            {
2429                tr_handshake * handshake = tr_handshakeNew( io,
2430                                                            mgr->session->encryptionMode,
2431                                                            myHandshakeDoneCB,
2432                                                            mgr );
2433
2434                assert( tr_peerIoGetTorrentHash( io ) );
2435
2436                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2437
2438                ++newConnectionsThisSecond;
2439
2440                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2441                                         handshakeCompare );
2442            }
2443
2444            atom->time = time( NULL );
2445        }
2446
2447        /* cleanup */
2448        tr_free( candidates );
2449        tr_free( mustClose );
2450        tr_free( canClose );
2451    }
2452}
2453
2454struct peer_liveliness
2455{
2456    tr_peer * peer;
2457    void * clientData;
2458    time_t pieceDataTime;
2459    time_t time;
2460    int speed;
2461    tr_bool doPurge;
2462};
2463
2464static int
2465comparePeerLiveliness( const void * va, const void * vb )
2466{
2467    const struct peer_liveliness * a = va;
2468    const struct peer_liveliness * b = vb;
2469
2470    if( a->doPurge != b->doPurge )
2471        return a->doPurge ? 1 : -1;
2472
2473    if( a->speed != b->speed ) /* faster goes first */
2474        return a->speed > b->speed ? -1 : 1;
2475
2476    /* the one to give us data more recently goes first */
2477    if( a->pieceDataTime != b->pieceDataTime )
2478        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2479
2480    /* the one we connected to most recently goes first */
2481    if( a->time != b->time )
2482        return a->time > b->time ? -1 : 1;
2483
2484    return 0;
2485}
2486
2487static int
2488comparePeerLivelinessReverse( const void * va, const void * vb )
2489{
2490    return -comparePeerLiveliness (va, vb);
2491}
2492
2493static void
2494sortPeersByLivelinessImpl( tr_peer  ** peers,
2495                           void     ** clientData,
2496                           int         n,
2497                           uint64_t    now,
2498                           int (*compare) ( const void *va, const void *vb ) )
2499{
2500    int i;
2501    struct peer_liveliness *lives, *l;
2502
2503    /* build a sortable array of peer + extra info */
2504    lives = l = tr_new0( struct peer_liveliness, n );
2505    for( i=0; i<n; ++i, ++l )
2506    {
2507        tr_peer * p = peers[i];
2508        l->peer = p;
2509        l->doPurge = p->doPurge;
2510        l->pieceDataTime = p->atom->piece_data_time;
2511        l->time = p->atom->time;
2512        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2513                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2514        if( clientData )
2515            l->clientData = clientData[i];
2516    }
2517
2518    /* sort 'em */
2519    assert( n == ( l - lives ) );
2520    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2521
2522    /* build the peer array */
2523    for( i=0, l=lives; i<n; ++i, ++l ) {
2524        peers[i] = l->peer;
2525        if( clientData )
2526            clientData[i] = l->clientData;
2527    }
2528    assert( n == ( l - lives ) );
2529
2530    /* cleanup */
2531    tr_free( lives );
2532}
2533
2534static void
2535sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2536{
2537    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2538}
2539
2540static void
2541sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2542{
2543    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2544}
2545
2546
2547static void
2548enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2549{
2550    int n = tr_ptrArraySize( &t->peers );
2551    const int max = tr_torrentGetPeerLimit( t->tor );
2552    if( n > max )
2553    {
2554        void * base = tr_ptrArrayBase( &t->peers );
2555        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2556        sortPeersByLiveliness( peers, NULL, n, now );
2557        while( n > max )
2558            closePeer( t, peers[--n] );
2559        tr_free( peers );
2560    }
2561}
2562
2563static void
2564enforceSessionPeerLimit( tr_session * session, uint64_t now )
2565{
2566    int n = 0;
2567    tr_torrent * tor = NULL;
2568    const int max = tr_sessionGetPeerLimit( session );
2569
2570    /* count the total number of peers */
2571    while(( tor = tr_torrentNext( session, tor )))
2572        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2573
2574    /* if there are too many, prune out the worst */
2575    if( n > max )
2576    {
2577        tr_peer ** peers = tr_new( tr_peer*, n );
2578        Torrent ** torrents = tr_new( Torrent*, n );
2579
2580        /* populate the peer array */
2581        n = 0;
2582        tor = NULL;
2583        while(( tor = tr_torrentNext( session, tor ))) {
2584            int i;
2585            Torrent * t = tor->torrentPeers;
2586            const int tn = tr_ptrArraySize( &t->peers );
2587            for( i=0; i<tn; ++i, ++n ) {
2588                peers[n] = tr_ptrArrayNth( &t->peers, i );
2589                torrents[n] = t;
2590            }
2591        }
2592
2593        /* sort 'em */
2594        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2595
2596        /* cull out the crappiest */
2597        while( n-- > max )
2598            closePeer( torrents[n], peers[n] );
2599
2600        /* cleanup */
2601        tr_free( torrents );
2602        tr_free( peers );
2603    }
2604}
2605
2606
2607static int
2608reconnectPulse( void * vmgr )
2609{
2610    tr_torrent * tor;
2611    tr_peerMgr * mgr = vmgr;
2612    uint64_t now;
2613    managerLock( mgr );
2614
2615    now = tr_date( );
2616
2617    /* if we're over the per-torrent peer limits, cull some peers */
2618    tor = NULL;
2619    while(( tor = tr_torrentNext( mgr->session, tor )))
2620        if( tor->isRunning )
2621            enforceTorrentPeerLimit( tor->torrentPeers, now );
2622
2623    /* if we're over the per-session peer limits, cull some peers */
2624    enforceSessionPeerLimit( mgr->session, now );
2625
2626    tor = NULL;
2627    while(( tor = tr_torrentNext( mgr->session, tor )))
2628        if( tor->isRunning )
2629            reconnectTorrent( tor->torrentPeers );
2630
2631    managerUnlock( mgr );
2632    return TRUE;
2633}
2634
2635/****
2636*****
2637*****  BANDWIDTH ALLOCATION
2638*****
2639****/
2640
2641static void
2642pumpAllPeers( tr_peerMgr * mgr )
2643{
2644    tr_torrent * tor = NULL;
2645
2646    while(( tor = tr_torrentNext( mgr->session, tor )))
2647    {
2648        int j;
2649        Torrent * t = tor->torrentPeers;
2650
2651        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2652        {
2653            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2654            tr_peerMsgsPulse( peer->msgs );
2655        }
2656    }
2657}
2658
2659static int
2660bandwidthPulse( void * vmgr )
2661{
2662    tr_torrent * tor = NULL;
2663    tr_peerMgr * mgr = vmgr;
2664    managerLock( mgr );
2665
2666    /* FIXME: this next line probably isn't necessary... */
2667    pumpAllPeers( mgr );
2668
2669    /* allocate bandwidth to the peers */
2670    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2671    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2672
2673    /* possibly stop torrents that have seeded enough */
2674    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2675        if( tor->needsSeedRatioCheck ) {
2676            tor->needsSeedRatioCheck = FALSE;
2677            tr_torrentCheckSeedRatio( tor );
2678        }
2679    }
2680
2681    /* possibly stop torrents that have an error */
2682    tor = NULL;
2683    while(( tor = tr_torrentNext( mgr->session, tor )))
2684        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2685            tr_torrentStop( tor );
2686
2687    managerUnlock( mgr );
2688    return TRUE;
2689}
Note: See TracBrowser for help on using the repository browser.