source: trunk/libtransmission/peer-mgr.c @ 9482

Last change on this file since 9482 was 9482, checked in by charles, 13 years ago

(trunk libT) #2552: Torrent state doesn't change automatically anymore

  • Property svn:keywords set to Date Rev Author Id
File size: 75.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9482 2009-11-05 01:31:35Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 400,
49
50    /* how frequently to reallocate bandwidth */
51    BANDWIDTH_PERIOD_MSEC = 500,
52
53    /* how frequently to age out old piece request lists */
54    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = 500,
58
59    /* when many peers are available, keep idle ones this long */
60    MIN_UPLOAD_IDLE_SECS = ( 60 ),
61
62    /* when few peers are available, keep idle ones this long */
63    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
64
65    /* max # of peers to ask fer per torrent per reconnect pulse */
66    MAX_RECONNECTIONS_PER_PULSE = 8,
67
68    /* max number of peers to ask for per second overall.
69    * this throttle is to avoid overloading the router */
70    MAX_CONNECTIONS_PER_SECOND = 16,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 5,
74
75    /* amount of time to keep a list of request pieces lying around
76       before it's considered too old and needs to be rebuilt */
77    PIECE_LIST_SHELF_LIFE_SECS = 60,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    MYFLAG_BANNED = 1,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    /* unreachable for now... but not banned.
84     * if they try to connect to us it's okay */
85    MYFLAG_UNREACHABLE = 2,
86
87    /* the minimum we'll wait before attempting to reconnect to a peer */
88    MINIMUM_RECONNECT_INTERVAL_SECS = 5
89};
90
91
92/**
93***
94**/
95
96enum
97{
98    UPLOAD_ONLY_UKNOWN,
99    UPLOAD_ONLY_YES,
100    UPLOAD_ONLY_NO
101};
102
103/**
104 * Peer information that should be kept even before we've connected and
105 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
106 * which ones would make good candidates for connecting to, and to watch out
107 * for banned peers.
108 *
109 * @see tr_peer
110 * @see tr_peermsgs
111 */
112struct peer_atom
113{
114    uint8_t     from;
115    uint8_t     flags;       /* these match the added_f flags */
116    uint8_t     myflags;     /* flags that aren't defined in added_f */
117    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
118    tr_port     port;
119    uint16_t    numFails;
120    tr_address  addr;
121    time_t      time;        /* when the peer's connection status last changed */
122    time_t      piece_data_time;
123};
124
125static tr_bool
126tr_isAtom( const struct peer_atom * atom )
127{
128    return ( atom != NULL )
129        && ( atom->from < TR_PEER_FROM__MAX )
130        && ( tr_isAddress( &atom->addr ) );
131}
132
133static const char*
134tr_atomAddrStr( const struct peer_atom * atom )
135{
136    return tr_peerIoAddrStr( &atom->addr, atom->port );
137}
138
139struct tr_blockIterator
140{
141    time_t expirationDate;
142    struct tr_torrent_peers * t;
143    tr_block_index_t blockIndex, blockCount, *blocks;
144    tr_piece_index_t pieceIndex, pieceCount, *pieces;
145};
146
147typedef struct tr_torrent_peers
148{
149    struct event               refillTimer;
150
151    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
152    tr_ptrArray                pool; /* struct peer_atom */
153    tr_ptrArray                peers; /* tr_peer */
154    tr_ptrArray                webseeds; /* tr_webseed */
155
156    tr_torrent               * tor;
157    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
158    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
159    struct tr_peerMgr        * manager;
160    int                      * pendingRequestCount;
161
162    tr_bool                    isRunning;
163}
164Torrent;
165
166struct tr_peerMgr
167{
168    tr_session      * session;
169    tr_ptrArray       incomingHandshakes; /* tr_handshake */
170    tr_timer        * bandwidthTimer;
171    tr_timer        * rechokeTimer;
172    tr_timer        * reconnectTimer;
173    tr_timer        * refillUpkeepTimer;
174};
175
176#define tordbg( t, ... ) \
177    do { \
178        if( tr_deepLoggingIsActive( ) ) \
179            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
180    } while( 0 )
181
182#define dbgmsg( ... ) \
183    do { \
184        if( tr_deepLoggingIsActive( ) ) \
185            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
186    } while( 0 )
187
188/**
189***
190**/
191
192static TR_INLINE void
193managerLock( const struct tr_peerMgr * manager )
194{
195    tr_globalLock( manager->session );
196}
197
198static TR_INLINE void
199managerUnlock( const struct tr_peerMgr * manager )
200{
201    tr_globalUnlock( manager->session );
202}
203
204static TR_INLINE void
205torrentLock( Torrent * torrent )
206{
207    managerLock( torrent->manager );
208}
209
210static TR_INLINE void
211torrentUnlock( Torrent * torrent )
212{
213    managerUnlock( torrent->manager );
214}
215
216static TR_INLINE int
217torrentIsLocked( const Torrent * t )
218{
219    return tr_globalIsLocked( t->manager->session );
220}
221
222/**
223***
224**/
225
226static int
227handshakeCompareToAddr( const void * va, const void * vb )
228{
229    const tr_handshake * a = va;
230
231    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
232}
233
234static int
235handshakeCompare( const void * a, const void * b )
236{
237    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
238}
239
240static tr_handshake*
241getExistingHandshake( tr_ptrArray      * handshakes,
242                      const tr_address * addr )
243{
244    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
245}
246
247static int
248comparePeerAtomToAddress( const void * va, const void * vb )
249{
250    const struct peer_atom * a = va;
251
252    return tr_compareAddresses( &a->addr, vb );
253}
254
255static int
256comparePeerAtoms( const void * va, const void * vb )
257{
258    const struct peer_atom * b = vb;
259
260    assert( tr_isAtom( b ) );
261
262    return comparePeerAtomToAddress( va, &b->addr );
263}
264
265/**
266***
267**/
268
269const tr_address *
270tr_peerAddress( const tr_peer * peer )
271{
272    return &peer->atom->addr;
273}
274
275static Torrent*
276getExistingTorrent( tr_peerMgr *    manager,
277                    const uint8_t * hash )
278{
279    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
280
281    return tor == NULL ? NULL : tor->torrentPeers;
282}
283
284static int
285peerCompare( const void * a, const void * b )
286{
287    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
288}
289
290static int
291peerCompareToAddr( const void * a, const void * vb )
292{
293    return tr_compareAddresses( tr_peerAddress( a ), vb );
294}
295
296static tr_peer*
297getExistingPeer( Torrent          * torrent,
298                 const tr_address * addr )
299{
300    assert( torrentIsLocked( torrent ) );
301    assert( addr );
302
303    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
304}
305
306static struct peer_atom*
307getExistingAtom( const Torrent    * t,
308                 const tr_address * addr )
309{
310    Torrent * tt = (Torrent*)t;
311    assert( torrentIsLocked( t ) );
312    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
313}
314
315static tr_bool
316peerIsInUse( const Torrent    * ct,
317             const tr_address * addr )
318{
319    Torrent * t = (Torrent*) ct;
320
321    assert( torrentIsLocked ( t ) );
322
323    return getExistingPeer( t, addr )
324        || getExistingHandshake( &t->outgoingHandshakes, addr )
325        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
326}
327
328static tr_peer*
329peerConstructor( struct peer_atom * atom )
330{
331    tr_peer * peer = tr_new0( tr_peer, 1 );
332    peer->atom = atom;
333    return peer;
334}
335
336static tr_peer*
337getPeer( Torrent * torrent, struct peer_atom * atom )
338{
339    tr_peer * peer;
340
341    assert( torrentIsLocked( torrent ) );
342
343    peer = getExistingPeer( torrent, &atom->addr );
344
345    if( peer == NULL )
346    {
347        peer = peerConstructor( atom );
348        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
349    }
350
351    return peer;
352}
353
354static void
355peerDestructor( tr_peer * peer )
356{
357    assert( peer );
358
359    if( peer->msgs != NULL )
360    {
361        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
362        tr_peerMsgsFree( peer->msgs );
363    }
364
365    tr_peerIoClear( peer->io );
366    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
367
368    tr_bitfieldFree( peer->have );
369    tr_bitfieldFree( peer->blame );
370    tr_free( peer->client );
371
372    tr_free( peer );
373}
374
375static void
376removePeer( Torrent * t, tr_peer * peer )
377{
378    tr_peer * removed;
379    struct peer_atom * atom = peer->atom;
380
381    assert( torrentIsLocked( t ) );
382    assert( atom );
383
384    atom->time = time( NULL );
385
386    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
387    assert( removed == peer );
388    peerDestructor( removed );
389}
390
391static void
392removeAllPeers( Torrent * t )
393{
394    while( !tr_ptrArrayEmpty( &t->peers ) )
395        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
396}
397
398static void blockIteratorFree( struct tr_blockIterator ** inout );
399
400static void
401torrentDestructor( void * vt )
402{
403    Torrent * t = vt;
404
405    assert( t );
406    assert( !t->isRunning );
407    assert( torrentIsLocked( t ) );
408    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
409    assert( tr_ptrArrayEmpty( &t->peers ) );
410
411    evtimer_del( &t->refillTimer );
412
413    blockIteratorFree( &t->refillQueue );
414    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
417    tr_ptrArrayDestruct( &t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423
424static void refillPulse( int, short, void* );
425
426static void peerCallbackFunc( void * vpeer,
427                              void * vevent,
428                              void * vt );
429
430static Torrent*
431torrentConstructor( tr_peerMgr * manager,
432                    tr_torrent * tor )
433{
434    int       i;
435    Torrent * t;
436
437    t = tr_new0( Torrent, 1 );
438    t->manager = manager;
439    t->tor = tor;
440    t->pool = TR_PTR_ARRAY_INIT;
441    t->peers = TR_PTR_ARRAY_INIT;
442    t->webseeds = TR_PTR_ARRAY_INIT;
443    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
444    evtimer_set( &t->refillTimer, refillPulse, t );
445
446
447    for( i = 0; i < tor->info.webseedCount; ++i )
448    {
449        tr_webseed * w =
450            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
451        tr_ptrArrayAppend( &t->webseeds, w );
452    }
453
454    return t;
455}
456
457
458static int bandwidthPulse ( void * vmgr );
459static int rechokePulse   ( void * vmgr );
460static int reconnectPulse ( void * vmgr );
461static int refillUpkeep   ( void * vmgr );
462
463tr_peerMgr*
464tr_peerMgrNew( tr_session * session )
465{
466    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
467    m->session = session;
468    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
469    return m;
470}
471
472static void
473deleteTimers( struct tr_peerMgr * m )
474{
475    if( m->bandwidthTimer )
476        tr_timerFree( &m->bandwidthTimer );
477
478    if( m->rechokeTimer )
479        tr_timerFree( &m->rechokeTimer );
480
481    if( m->reconnectTimer )
482        tr_timerFree( &m->reconnectTimer );
483
484    if( m->refillUpkeepTimer )
485        tr_timerFree( &m->refillUpkeepTimer );
486}
487
488void
489tr_peerMgrFree( tr_peerMgr * manager )
490{
491    managerLock( manager );
492
493    deleteTimers( manager );
494
495    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
496     * the item from manager->handshakes, so this is a little roundabout... */
497    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
498        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
499
500    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
501
502    managerUnlock( manager );
503    tr_free( manager );
504}
505
506static int
507clientIsDownloadingFrom( const tr_peer * peer )
508{
509    return peer->clientIsInterested && !peer->clientIsChoked;
510}
511
512static int
513clientIsUploadingTo( const tr_peer * peer )
514{
515    return peer->peerIsInterested && !peer->peerIsChoked;
516}
517
518/***
519****
520***/
521
522tr_bool
523tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
524                      const tr_address  * addr )
525{
526    tr_bool isSeed = FALSE;
527    const Torrent * t = tor->torrentPeers;
528    const struct peer_atom * atom = getExistingAtom( t, addr );
529
530    if( atom )
531        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
532
533    return isSeed;
534}
535
536/****
537*****
538*****  REFILL
539*****
540****/
541
542static void
543assertValidPiece( Torrent * t, tr_piece_index_t piece )
544{
545    assert( t );
546    assert( t->tor );
547    assert( piece < t->tor->info.pieceCount );
548}
549
550static int
551getPieceRequests( Torrent * t, tr_piece_index_t piece )
552{
553    assertValidPiece( t, piece );
554
555    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
556}
557
558static void
559incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
560{
561    assertValidPiece( t, piece );
562
563    if( t->pendingRequestCount == NULL )
564        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
565    t->pendingRequestCount[piece]++;
566}
567
568static void
569decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
570{
571    assertValidPiece( t, piece );
572
573    if( t->pendingRequestCount )
574        t->pendingRequestCount[piece]--;
575}
576
577struct tr_refill_piece
578{
579    tr_priority_t    priority;
580    uint32_t         piece;
581    uint32_t         peerCount;
582    int              random;
583    int              pendingRequestCount;
584    int              missingBlockCount;
585};
586
587static int
588compareRefillPiece( const void * aIn, const void * bIn )
589{
590    const struct tr_refill_piece * a = aIn;
591    const struct tr_refill_piece * b = bIn;
592
593    /* if one piece has a higher priority, it goes first */
594    if( a->priority != b->priority )
595        return a->priority > b->priority ? -1 : 1;
596
597    /* have a per-priority endgame */
598    if( a->pendingRequestCount != b->pendingRequestCount )
599        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
600
601    /* fewer missing pieces goes first */
602    if( a->missingBlockCount != b->missingBlockCount )
603        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
604
605    /* otherwise if one has fewer peers, it goes first */
606    if( a->peerCount != b->peerCount )
607        return a->peerCount < b->peerCount ? -1 : 1;
608
609    /* otherwise go with our random seed */
610    if( a->random != b->random )
611        return a->random < b->random ? -1 : 1;
612
613    return 0;
614}
615
616static tr_piece_index_t *
617getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
618{
619    const tr_torrent  * tor = t->tor;
620    const tr_info     * inf = &tor->info;
621    tr_piece_index_t    i;
622    tr_piece_index_t    poolSize = 0;
623    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
624    int                 peerCount;
625    const tr_peer    ** peers;
626
627    assert( torrentIsLocked( t ) );
628
629    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
630    peerCount = tr_ptrArraySize( &t->peers );
631
632    /* make a list of the pieces that we want but don't have */
633    for( i = 0; i < inf->pieceCount; ++i )
634        if( !tor->info.pieces[i].dnd
635                && !tr_cpPieceIsComplete( &tor->completion, i ) )
636            pool[poolSize++] = i;
637
638    /* sort the pool by which to request next */
639    if( poolSize > 1 )
640    {
641        tr_piece_index_t j;
642        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
643
644        for( j = 0; j < poolSize; ++j )
645        {
646            int k;
647            const tr_piece_index_t piece = pool[j];
648            struct tr_refill_piece * setme = p + j;
649
650            setme->piece = piece;
651            setme->priority = inf->pieces[piece].priority;
652            setme->peerCount = 0;
653            setme->random = tr_cryptoWeakRandInt( INT_MAX );
654            setme->pendingRequestCount = getPieceRequests( t, piece );
655            setme->missingBlockCount
656                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
657
658            for( k = 0; k < peerCount; ++k )
659            {
660                const tr_peer * peer = peers[k];
661                if( peer->peerIsInterested
662                        && !peer->clientIsChoked
663                        && tr_bitfieldHas( peer->have, piece ) )
664                    ++setme->peerCount;
665            }
666        }
667
668        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
669               compareRefillPiece );
670
671        for( j = 0; j < poolSize; ++j )
672            pool[j] = p[j].piece;
673
674        tr_free( p );
675    }
676
677    *pieceCount = poolSize;
678    return pool;
679}
680
681static struct tr_blockIterator*
682blockIteratorNew( Torrent * t )
683{
684    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
685    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
686    i->t = t;
687    i->pieces = getPreferredPieces( t, &i->pieceCount );
688    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
689    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
690    return i;
691}
692
693static tr_bool
694blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
695{
696    tr_bool found;
697    Torrent * t = i->t;
698    tr_torrent * tor = t->tor;
699
700    while( ( i->blockIndex == i->blockCount )
701        && ( i->pieceIndex < i->pieceCount ) )
702    {
703        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
704        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
705        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
706        tr_block_index_t block;
707
708        assert( index < tor->info.pieceCount );
709
710        i->blockCount = 0;
711        i->blockIndex = 0;
712        for( block=b; block!=e; ++block )
713            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
714                i->blocks[i->blockCount++] = block;
715    }
716
717    assert( i->blockCount <= tor->blockCountInPiece );
718
719    if(( found = ( i->blockIndex < i->blockCount )))
720        *setme = i->blocks[i->blockIndex++];
721
722    return found;
723}
724
725static void
726blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
727{
728    i->blockIndex = i->blockCount;
729}
730
731static void
732blockIteratorFree( struct tr_blockIterator ** inout )
733{
734    struct tr_blockIterator * it = *inout;
735
736    if( it != NULL )
737    {
738        tr_free( it->blocks );
739        tr_free( it->pieces );
740        tr_free( it );
741    }
742
743    *inout = NULL;
744}
745
746static tr_peer**
747getPeersUploadingToClient( Torrent * t,
748                           int *     setmeCount )
749{
750    int j;
751    int peerCount = 0;
752    int retCount = 0;
753    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
754    tr_peer ** ret = tr_new( tr_peer *, peerCount );
755
756    j = 0; /* this is a temporary test to make sure we walk through all the peers */
757    if( peerCount )
758    {
759        /* Get a list of peers we're downloading from.
760           Pick a different starting point each time so all peers
761           get a chance at being the first in line */
762        const int fencepost = tr_cryptoWeakRandInt( peerCount );
763        int i = fencepost;
764        do {
765            if( clientIsDownloadingFrom( peers[i] ) )
766                ret[retCount++] = peers[i];
767            i = ( i + 1 ) % peerCount;
768            ++j;
769        } while( i != fencepost );
770    }
771    assert( j == peerCount );
772    *setmeCount = retCount;
773    return ret;
774}
775
776static uint32_t
777getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
778{
779    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
780    const uint64_t blockPos = tor->blockSize * b;
781    assert( blockPos >= piecePos );
782    return (uint32_t)( blockPos - piecePos );
783}
784
785static int
786refillUpkeep( void * vmgr )
787{
788    tr_torrent * tor = NULL;
789    tr_peerMgr * mgr = vmgr;
790    time_t now;
791    managerLock( mgr );
792
793    now = time( NULL );
794    while(( tor = tr_torrentNext( mgr->session, tor ))) {
795        Torrent * t = tor->torrentPeers;
796        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
797            tordbg( t, "refill queue is past its shelf date; discarding." );
798            blockIteratorFree( &t->refillQueue );
799        }
800    }
801
802    managerUnlock( mgr );
803    return TRUE;
804}
805
806static void
807sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
808
809static void
810refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
811{
812    tr_block_index_t block;
813    int peerCount;
814    int webseedCount;
815    tr_peer ** peers;
816    tr_webseed ** webseeds;
817    Torrent * t = vtorrent;
818    tr_torrent * tor = t->tor;
819    tr_bool hasNext = TRUE;
820
821    if( !t->isRunning )
822        return;
823    if( tr_torrentIsSeed( t->tor ) )
824        return;
825
826    torrentLock( t );
827    tordbg( t, "Refilling Request Buffers..." );
828
829    if( t->refillQueue == NULL )
830        t->refillQueue = blockIteratorNew( t );
831
832    peers = getPeersUploadingToClient( t, &peerCount );
833    sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );
834    webseedCount = tr_ptrArraySize( &t->webseeds );
835    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
836                          webseedCount * sizeof( tr_webseed* ) );
837
838    while( ( webseedCount || peerCount )
839        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
840    {
841        int j;
842        tr_bool handled = FALSE;
843
844        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
845        const uint32_t offset = getBlockOffsetInPiece( tor, block );
846        const uint32_t length = tr_torBlockCountBytes( tor, block );
847
848        assert( block < tor->blockCount );
849
850        /* find a peer who can ask for this block */
851        for( j=0; !handled && j<peerCount; )
852        {
853            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
854            switch( val )
855            {
856                case TR_ADDREQ_FULL:
857                case TR_ADDREQ_CLIENT_CHOKED:
858                    peers[j] = peers[--peerCount];
859                    break;
860
861                case TR_ADDREQ_MISSING:
862                case TR_ADDREQ_DUPLICATE:
863                    ++j;
864                    break;
865
866                case TR_ADDREQ_OK:
867                    incrementPieceRequests( t, index );
868                    handled = TRUE;
869                    break;
870
871                default:
872                    assert( 0 && "unhandled value" );
873                    break;
874            }
875        }
876
877        /* maybe one of the webseeds can do it */
878        for( j=0; !handled && j<webseedCount; )
879        {
880            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                    webseeds[j] = webseeds[--webseedCount];
885                    break;
886
887                case TR_ADDREQ_OK:
888                    incrementPieceRequests( t, index );
889                    handled = TRUE;
890                    break;
891
892                default:
893                    assert( 0 && "unhandled value" );
894                    break;
895            }
896        }
897
898        if( !handled )
899            blockIteratorSkipCurrentPiece( t->refillQueue );
900    }
901
902    /* cleanup */
903    tr_free( webseeds );
904    tr_free( peers );
905
906    if( !hasNext ) {
907        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
908        blockIteratorFree( &t->refillQueue );
909    }
910
911    torrentUnlock( t );
912}
913
914static void
915broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
916{
917    size_t i;
918    size_t peerCount;
919    tr_peer ** peers;
920
921    assert( torrentIsLocked( t ) );
922
923    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
924
925    peerCount = tr_ptrArraySize( &t->peers );
926    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
927    for( i=0; i<peerCount; ++i )
928        if( peers[i]->msgs )
929            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
930}
931
932static void
933addStrike( Torrent * t, tr_peer * peer )
934{
935    tordbg( t, "increasing peer %s strike count to %d",
936            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
937
938    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
939    {
940        struct peer_atom * atom = peer->atom;
941        atom->myflags |= MYFLAG_BANNED;
942        peer->doPurge = 1;
943        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
944    }
945}
946
947static void
948gotBadPiece( Torrent *        t,
949             tr_piece_index_t pieceIndex )
950{
951    tr_torrent *   tor = t->tor;
952    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
953
954    tor->corruptCur += byteCount;
955    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
956}
957
958static void
959refillSoon( Torrent * t )
960{
961    if( !evtimer_pending( &t->refillTimer, NULL ) )
962        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
963}
964
965static void
966peerSuggestedPiece( Torrent            * t UNUSED,
967                    tr_peer            * peer UNUSED,
968                    tr_piece_index_t     pieceIndex UNUSED,
969                    int                  isFastAllowed UNUSED )
970{
971#if 0
972    assert( t );
973    assert( peer );
974    assert( peer->msgs );
975
976    /* is this a valid piece? */
977    if(  pieceIndex >= t->tor->info.pieceCount )
978        return;
979
980    /* don't ask for it if we've already got it */
981    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
982        return;
983
984    /* don't ask for it if they don't have it */
985    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
986        return;
987
988    /* don't ask for it if we're choked and it's not fast */
989    if( !isFastAllowed && peer->clientIsChoked )
990        return;
991
992    /* request the blocks that we don't have in this piece */
993    {
994        tr_block_index_t block;
995        const tr_torrent * tor = t->tor;
996        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
997        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
998
999        for( block=start; block<end; ++block )
1000        {
1001            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1002            {
1003                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1004                const uint32_t length = tr_torBlockCountBytes( tor, block );
1005                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1006                incrementPieceRequests( t, pieceIndex );
1007            }
1008        }
1009    }
1010#endif
1011}
1012
1013static void
1014peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1015{
1016    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1017    Torrent * t = vt;
1018    const tr_peer_event * e = vevent;
1019
1020    torrentLock( t );
1021
1022    switch( e->eventType )
1023    {
1024        case TR_PEER_UPLOAD_ONLY:
1025            /* update our atom */
1026            if( peer ) {
1027                if( e->uploadOnly ) {
1028                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1029                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1030                } else {
1031                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1032                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1033                }
1034            }
1035            break;
1036
1037        case TR_PEER_NEED_REQ:
1038            refillSoon( t );
1039            break;
1040
1041        case TR_PEER_CANCEL:
1042            decrementPieceRequests( t, e->pieceIndex );
1043            break;
1044
1045        case TR_PEER_PEER_GOT_DATA:
1046        {
1047            const time_t now = time( NULL );
1048            tr_torrent * tor = t->tor;
1049
1050            tr_torrentSetActivityDate( tor, now );
1051
1052            if( e->wasPieceData ) {
1053                tor->uploadedCur += e->length;
1054                tr_torrentSetDirty( tor );
1055            }
1056
1057            /* update the stats */
1058            if( e->wasPieceData )
1059                tr_statsAddUploaded( tor->session, e->length );
1060
1061            /* update our atom */
1062            if( peer && e->wasPieceData )
1063                peer->atom->piece_data_time = now;
1064
1065            tor->needsSeedRatioCheck = TRUE;
1066
1067            break;
1068        }
1069
1070        case TR_PEER_CLIENT_GOT_PORT:
1071            if( peer )
1072                peer->atom->port = e->port;
1073            break;
1074
1075        case TR_PEER_CLIENT_GOT_SUGGEST:
1076            if( peer )
1077                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1078            break;
1079
1080        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1081            if( peer )
1082                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1083            break;
1084
1085        case TR_PEER_CLIENT_GOT_DATA:
1086        {
1087            const time_t now = time( NULL );
1088            tr_torrent * tor = t->tor;
1089
1090            tr_torrentSetActivityDate( tor, now );
1091
1092            /* only add this to downloadedCur if we got it from a peer --
1093             * webseeds shouldn't count against our ratio.  As one tracker
1094             * admin put it, "Those pieces are downloaded directly from the
1095             * content distributor, not the peers, it is the tracker's job
1096             * to manage the swarms, not the web server and does not fit
1097             * into the jurisdiction of the tracker." */
1098            if( peer && e->wasPieceData ) {
1099                tor->downloadedCur += e->length;
1100                tr_torrentSetDirty( tor );
1101            }
1102
1103            /* update the stats */
1104            if( e->wasPieceData )
1105                tr_statsAddDownloaded( tor->session, e->length );
1106
1107            /* update our atom */
1108            if( peer && e->wasPieceData )
1109                peer->atom->piece_data_time = now;
1110
1111            break;
1112        }
1113
1114        case TR_PEER_PEER_PROGRESS:
1115        {
1116            if( peer )
1117            {
1118                struct peer_atom * atom = peer->atom;
1119                if( e->progress >= 1.0 ) {
1120                    tordbg( t, "marking peer %s as a seed",
1121                            tr_atomAddrStr( atom ) );
1122                    atom->flags |= ADDED_F_SEED_FLAG;
1123                }
1124            }
1125            break;
1126        }
1127
1128        case TR_PEER_CLIENT_GOT_BLOCK:
1129        {
1130            tr_torrent * tor = t->tor;
1131
1132            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1133
1134            tr_cpBlockAdd( &tor->completion, block );
1135            tr_torrentSetDirty( tor );
1136            decrementPieceRequests( t, e->pieceIndex );
1137
1138            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1139
1140            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1141            {
1142                const tr_piece_index_t p = e->pieceIndex;
1143                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1144
1145                if( !ok )
1146                {
1147                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1148                               (unsigned long)p );
1149                }
1150
1151                tr_torrentSetHasPiece( tor, p, ok );
1152                tr_torrentSetPieceChecked( tor, p, TRUE );
1153                tr_peerMgrSetBlame( tor, p, ok );
1154
1155                if( !ok )
1156                {
1157                    gotBadPiece( t, p );
1158                }
1159                else
1160                {
1161                    int i;
1162                    int peerCount;
1163                    tr_peer ** peers;
1164                    tr_file_index_t fileIndex;
1165
1166                    peerCount = tr_ptrArraySize( &t->peers );
1167                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1168                    for( i=0; i<peerCount; ++i )
1169                        tr_peerMsgsHave( peers[i]->msgs, p );
1170
1171                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1172                        const tr_file * file = &tor->info.files[fileIndex];
1173                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1174                            if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1175                                tr_torrentFileCompleted( tor, fileIndex );
1176                    }
1177                }
1178                tr_torrentRecheckCompleteness( tor );
1179            }
1180            break;
1181        }
1182
1183        case TR_PEER_ERROR:
1184            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1185            {
1186                /* some protocol error from the peer */
1187                peer->doPurge = 1;
1188                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1189                        tr_atomAddrStr( peer->atom ) );
1190            }
1191            else 
1192            {
1193                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1194            }
1195            break;
1196
1197        default:
1198            assert( 0 );
1199    }
1200
1201    torrentUnlock( t );
1202}
1203
1204static void
1205ensureAtomExists( Torrent          * t,
1206                  const tr_address * addr,
1207                  tr_port            port,
1208                  uint8_t            flags,
1209                  uint8_t            from )
1210{
1211    assert( tr_isAddress( addr ) );
1212    assert( from < TR_PEER_FROM__MAX );
1213
1214    if( getExistingAtom( t, addr ) == NULL )
1215    {
1216        struct peer_atom * a;
1217        a = tr_new0( struct peer_atom, 1 );
1218        a->addr = *addr;
1219        a->port = port;
1220        a->flags = flags;
1221        a->from = from;
1222        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1223        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1224    }
1225}
1226
1227static int
1228getMaxPeerCount( const tr_torrent * tor )
1229{
1230    return tor->maxConnectedPeers;
1231}
1232
1233static int
1234getPeerCount( const Torrent * t )
1235{
1236    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1237}
1238
1239/* FIXME: this is kind of a mess. */
1240static tr_bool
1241myHandshakeDoneCB( tr_handshake  * handshake,
1242                   tr_peerIo     * io,
1243                   tr_bool         isConnected,
1244                   const uint8_t * peer_id,
1245                   void          * vmanager )
1246{
1247    tr_bool            ok = isConnected;
1248    tr_bool            success = FALSE;
1249    tr_port            port;
1250    const tr_address * addr;
1251    tr_peerMgr       * manager = vmanager;
1252    Torrent          * t;
1253    tr_handshake     * ours;
1254
1255    assert( io );
1256    assert( tr_isBool( ok ) );
1257
1258    t = tr_peerIoHasTorrentHash( io )
1259        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1260        : NULL;
1261
1262    if( tr_peerIoIsIncoming ( io ) )
1263        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1264                                        handshake, handshakeCompare );
1265    else if( t )
1266        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1267                                        handshake, handshakeCompare );
1268    else
1269        ours = handshake;
1270
1271    assert( ours );
1272    assert( ours == handshake );
1273
1274    if( t )
1275        torrentLock( t );
1276
1277    addr = tr_peerIoGetAddress( io, &port );
1278
1279    if( !ok || !t || !t->isRunning )
1280    {
1281        if( t )
1282        {
1283            struct peer_atom * atom = getExistingAtom( t, addr );
1284            if( atom )
1285                ++atom->numFails;
1286        }
1287    }
1288    else /* looking good */
1289    {
1290        struct peer_atom * atom;
1291        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1292        atom = getExistingAtom( t, addr );
1293        atom->time = time( NULL );
1294        atom->piece_data_time = 0;
1295
1296        if( atom->myflags & MYFLAG_BANNED )
1297        {
1298            tordbg( t, "banned peer %s tried to reconnect",
1299                    tr_atomAddrStr( atom ) );
1300        }
1301        else if( tr_peerIoIsIncoming( io )
1302               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1303
1304        {
1305        }
1306        else
1307        {
1308            tr_peer * peer = getExistingPeer( t, addr );
1309
1310            if( peer ) /* we already have this peer */
1311            {
1312            }
1313            else
1314            {
1315                peer = getPeer( t, atom );
1316                tr_free( peer->client );
1317
1318                if( !peer_id )
1319                    peer->client = NULL;
1320                else {
1321                    char client[128];
1322                    tr_clientForId( client, sizeof( client ), peer_id );
1323                    peer->client = tr_strdup( client );
1324                }
1325
1326                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1327                                                                balanced by our unref in peerDestructor()  */
1328                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1329                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1330
1331                success = TRUE;
1332            }
1333        }
1334    }
1335
1336    if( t )
1337        torrentUnlock( t );
1338
1339    return success;
1340}
1341
1342void
1343tr_peerMgrAddIncoming( tr_peerMgr * manager,
1344                       tr_address * addr,
1345                       tr_port      port,
1346                       int          socket )
1347{
1348    tr_session * session;
1349
1350    managerLock( manager );
1351
1352    assert( tr_isSession( manager->session ) );
1353    session = manager->session;
1354
1355    if( tr_sessionIsAddressBlocked( session, addr ) )
1356    {
1357        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1358        tr_netClose( session, socket );
1359    }
1360    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1361    {
1362        tr_netClose( session, socket );
1363    }
1364    else /* we don't have a connetion to them yet... */
1365    {
1366        tr_peerIo *    io;
1367        tr_handshake * handshake;
1368
1369        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1370
1371        handshake = tr_handshakeNew( io,
1372                                     session->encryptionMode,
1373                                     myHandshakeDoneCB,
1374                                     manager );
1375
1376        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1377
1378        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1379                                 handshakeCompare );
1380    }
1381
1382    managerUnlock( manager );
1383}
1384
1385static tr_bool
1386tr_isPex( const tr_pex * pex )
1387{
1388    return pex && tr_isAddress( &pex->addr );
1389}
1390
1391void
1392tr_peerMgrAddPex( tr_torrent   *  tor,
1393                  uint8_t         from,
1394                  const tr_pex *  pex )
1395{
1396    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1397    {
1398        Torrent * t = tor->torrentPeers;
1399        managerLock( t->manager );
1400
1401        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1402            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1403                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1404
1405        managerUnlock( t->manager );
1406    }
1407}
1408
1409tr_pex *
1410tr_peerMgrCompactToPex( const void *    compact,
1411                        size_t          compactLen,
1412                        const uint8_t * added_f,
1413                        size_t          added_f_len,
1414                        size_t *        pexCount )
1415{
1416    size_t          i;
1417    size_t          n = compactLen / 6;
1418    const uint8_t * walk = compact;
1419    tr_pex *        pex = tr_new0( tr_pex, n );
1420
1421    for( i = 0; i < n; ++i )
1422    {
1423        pex[i].addr.type = TR_AF_INET;
1424        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1425        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1426        if( added_f && ( n == added_f_len ) )
1427            pex[i].flags = added_f[i];
1428    }
1429
1430    *pexCount = n;
1431    return pex;
1432}
1433
1434tr_pex *
1435tr_peerMgrCompact6ToPex( const void    * compact,
1436                         size_t          compactLen,
1437                         const uint8_t * added_f,
1438                         size_t          added_f_len,
1439                         size_t        * pexCount )
1440{
1441    size_t          i;
1442    size_t          n = compactLen / 18;
1443    const uint8_t * walk = compact;
1444    tr_pex *        pex = tr_new0( tr_pex, n );
1445
1446    for( i = 0; i < n; ++i )
1447    {
1448        pex[i].addr.type = TR_AF_INET6;
1449        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1450        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1451        if( added_f && ( n == added_f_len ) )
1452            pex[i].flags = added_f[i];
1453    }
1454
1455    *pexCount = n;
1456    return pex;
1457}
1458
1459tr_pex *
1460tr_peerMgrArrayToPex( const void * array,
1461                      size_t       arrayLen,
1462                      size_t      * pexCount )
1463{
1464    size_t          i;
1465    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1466    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1467    const uint8_t * walk = array;
1468    tr_pex        * pex = tr_new0( tr_pex, n );
1469
1470    for( i = 0 ; i < n ; i++ ) {
1471        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1472        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1473        pex[i].flags = 0x00;
1474        walk += sizeof( tr_address ) + 2;
1475    }
1476
1477    *pexCount = n;
1478    return pex;
1479}
1480
1481/**
1482***
1483**/
1484
1485void
1486tr_peerMgrSetBlame( tr_torrent     * tor,
1487                    tr_piece_index_t pieceIndex,
1488                    int              success )
1489{
1490    if( !success )
1491    {
1492        int        peerCount, i;
1493        Torrent *  t = tor->torrentPeers;
1494        tr_peer ** peers;
1495
1496        assert( torrentIsLocked( t ) );
1497
1498        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1499        for( i = 0; i < peerCount; ++i )
1500        {
1501            tr_peer * peer = peers[i];
1502            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1503            {
1504                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1505                        tr_atomAddrStr( peer->atom ),
1506                        pieceIndex, (int)peer->strikes + 1 );
1507                addStrike( t, peer );
1508            }
1509        }
1510    }
1511}
1512
1513int
1514tr_pexCompare( const void * va, const void * vb )
1515{
1516    const tr_pex * a = va;
1517    const tr_pex * b = vb;
1518    int i;
1519
1520    assert( tr_isPex( a ) );
1521    assert( tr_isPex( b ) );
1522
1523    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1524        return i;
1525
1526    if( a->port != b->port )
1527        return a->port < b->port ? -1 : 1;
1528
1529    return 0;
1530}
1531
1532#if 0
1533static int
1534peerPrefersCrypto( const tr_peer * peer )
1535{
1536    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1537        return TRUE;
1538
1539    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1540        return FALSE;
1541
1542    return tr_peerIoIsEncrypted( peer->io );
1543}
1544#endif
1545
1546/* better goes first */
1547static int
1548compareAtomsByUsefulness( const void * va, const void *vb )
1549{
1550    const struct peer_atom * a = * (const struct peer_atom**) va;
1551    const struct peer_atom * b = * (const struct peer_atom**) vb;
1552
1553    assert( tr_isAtom( a ) );
1554    assert( tr_isAtom( b ) );
1555
1556    if( a->piece_data_time != b->piece_data_time )
1557        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1558    if( a->from != b->from )
1559        return a->from < b->from ? -1 : 1;
1560    if( a->numFails != b->numFails )
1561        return a->numFails < b->numFails ? -1 : 1;
1562
1563    return 0;
1564}
1565
1566int
1567tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af, int maxPeerCount )
1568{
1569    int count = 0;
1570    const Torrent * t = tor->torrentPeers;
1571
1572    managerLock( t->manager );
1573
1574    {
1575        int i;
1576        const int atomCount = tr_ptrArraySize( &t->pool );
1577        const int pexCount = MIN( atomCount, maxPeerCount );
1578        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1579        struct peer_atom ** atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1580        /* for now, this will waste memory on torrents that have both
1581         * ipv6 and ipv4 peers */
1582        tr_pex * pex = tr_new0( tr_pex, atomCount );
1583        tr_pex * walk = pex;
1584
1585        qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1586
1587        for( i=0; i<atomCount && count<pexCount; ++i )
1588        {
1589            const struct peer_atom * atom = atoms[i];
1590            if( atom->addr.type == af )
1591            {
1592                assert( tr_isAddress( &atom->addr ) );
1593                walk->addr = atom->addr;
1594                walk->port = atom->port;
1595                walk->flags = atom->flags;
1596                ++count;
1597                ++walk;
1598            }
1599        }
1600
1601        assert( ( walk - pex ) == count );
1602        *setme_pex = pex;
1603
1604        tr_free( atoms );
1605    }
1606
1607    managerUnlock( t->manager );
1608    return count;
1609}
1610
1611static void
1612ensureMgrTimersExist( struct tr_peerMgr * m )
1613{
1614    tr_session * s = m->session;
1615
1616    if( m->bandwidthTimer == NULL )
1617        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1618
1619    if( m->rechokeTimer == NULL )
1620        m->rechokeTimer = tr_timerNew( s, rechokePulse, m, RECHOKE_PERIOD_MSEC );
1621
1622    if( m->reconnectTimer == NULL )
1623        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
1624
1625    if( m->refillUpkeepTimer == NULL )
1626        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
1627}
1628
1629void
1630tr_peerMgrStartTorrent( tr_torrent * tor )
1631{
1632    Torrent * t = tor->torrentPeers;
1633
1634    assert( t != NULL );
1635    managerLock( t->manager );
1636    ensureMgrTimersExist( t->manager );
1637
1638    if( !t->isRunning )
1639    {
1640        t->isRunning = TRUE;
1641
1642        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1643            refillSoon( t );
1644    }
1645
1646    rechokePulse( t->manager );
1647    managerUnlock( t->manager );
1648}
1649
1650static void
1651stopTorrent( Torrent * t )
1652{
1653    assert( torrentIsLocked( t ) );
1654
1655    t->isRunning = FALSE;
1656
1657    /* disconnect the peers. */
1658    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1659    tr_ptrArrayClear( &t->peers );
1660
1661    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1662     * which removes the handshake from t->outgoingHandshakes... */
1663    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1664        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1665}
1666
1667void
1668tr_peerMgrStopTorrent( tr_torrent * tor )
1669{
1670    Torrent * t = tor->torrentPeers;
1671
1672    managerLock( t->manager );
1673
1674    stopTorrent( t );
1675
1676    managerUnlock( t->manager );
1677}
1678
1679void
1680tr_peerMgrAddTorrent( tr_peerMgr * manager,
1681                      tr_torrent * tor )
1682{
1683    managerLock( manager );
1684
1685    assert( tor );
1686    assert( tor->torrentPeers == NULL );
1687
1688    tor->torrentPeers = torrentConstructor( manager, tor );
1689
1690    managerUnlock( manager );
1691}
1692
1693void
1694tr_peerMgrRemoveTorrent( tr_torrent * tor )
1695{
1696    tr_torrentLock( tor );
1697
1698    stopTorrent( tor->torrentPeers );
1699    torrentDestructor( tor->torrentPeers );
1700
1701    tr_torrentUnlock( tor );
1702}
1703
1704void
1705tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1706                               int8_t           * tab,
1707                               unsigned int       tabCount )
1708{
1709    tr_piece_index_t   i;
1710    const Torrent *    t;
1711    float              interval;
1712    tr_bool            isSeed;
1713    int                peerCount;
1714    const tr_peer **   peers;
1715    tr_torrentLock( tor );
1716
1717    t = tor->torrentPeers;
1718    tor = t->tor;
1719    interval = tor->info.pieceCount / (float)tabCount;
1720    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1721    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1722    peerCount = tr_ptrArraySize( &t->peers );
1723
1724    memset( tab, 0, tabCount );
1725
1726    for( i = 0; tor && i < tabCount; ++i )
1727    {
1728        const int piece = i * interval;
1729
1730        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1731            tab[i] = -1;
1732        else if( peerCount ) {
1733            int j;
1734            for( j = 0; j < peerCount; ++j )
1735                if( tr_bitfieldHas( peers[j]->have, i ) )
1736                    ++tab[i];
1737        }
1738    }
1739
1740    tr_torrentUnlock( tor );
1741}
1742
1743/* Returns the pieces that are available from peers */
1744tr_bitfield*
1745tr_peerMgrGetAvailable( const tr_torrent * tor )
1746{
1747    int i;
1748    int peerCount;
1749    Torrent * t = tor->torrentPeers;
1750    const tr_peer ** peers;
1751    tr_bitfield * pieces;
1752    managerLock( t->manager );
1753
1754    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1755    peerCount = tr_ptrArraySize( &t->peers );
1756    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1757    for( i=0; i<peerCount; ++i )
1758        tr_bitfieldOr( pieces, peers[i]->have );
1759
1760    managerUnlock( t->manager );
1761    return pieces;
1762}
1763
1764void
1765tr_peerMgrTorrentStats( tr_torrent       * tor,
1766                        int              * setmePeersKnown,
1767                        int              * setmePeersConnected,
1768                        int              * setmeSeedsConnected,
1769                        int              * setmeWebseedsSendingToUs,
1770                        int              * setmePeersSendingToUs,
1771                        int              * setmePeersGettingFromUs,
1772                        int              * setmePeersFrom )
1773{
1774    int i, size;
1775    const Torrent * t = tor->torrentPeers;
1776    const tr_peer ** peers;
1777    const tr_webseed ** webseeds;
1778
1779    managerLock( t->manager );
1780
1781    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1782    size = tr_ptrArraySize( &t->peers );
1783
1784    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1785    *setmePeersConnected       = 0;
1786    *setmeSeedsConnected       = 0;
1787    *setmePeersGettingFromUs   = 0;
1788    *setmePeersSendingToUs     = 0;
1789    *setmeWebseedsSendingToUs  = 0;
1790
1791    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1792        setmePeersFrom[i] = 0;
1793
1794    for( i=0; i<size; ++i )
1795    {
1796        const tr_peer * peer = peers[i];
1797        const struct peer_atom * atom = peer->atom;
1798
1799        if( peer->io == NULL ) /* not connected */
1800            continue;
1801
1802        ++*setmePeersConnected;
1803
1804        ++setmePeersFrom[atom->from];
1805
1806        if( clientIsDownloadingFrom( peer ) )
1807            ++*setmePeersSendingToUs;
1808
1809        if( clientIsUploadingTo( peer ) )
1810            ++*setmePeersGettingFromUs;
1811
1812        if( atom->flags & ADDED_F_SEED_FLAG )
1813            ++*setmeSeedsConnected;
1814    }
1815
1816    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1817    size = tr_ptrArraySize( &t->webseeds );
1818    for( i=0; i<size; ++i )
1819        if( tr_webseedIsActive( webseeds[i] ) )
1820            ++*setmeWebseedsSendingToUs;
1821
1822    managerUnlock( t->manager );
1823}
1824
1825float
1826tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
1827{
1828    int i;
1829    float tmp;
1830    float ret = 0;
1831
1832    const Torrent * t = tor->torrentPeers;
1833    const int n = tr_ptrArraySize( &t->webseeds );
1834    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1835
1836    for( i=0; i<n; ++i )
1837        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
1838            ret += tmp;
1839
1840    return ret;
1841}
1842
1843
1844float*
1845tr_peerMgrWebSpeeds( const tr_torrent * tor )
1846{
1847    const Torrent * t = tor->torrentPeers;
1848    const tr_webseed ** webseeds;
1849    int i;
1850    int webseedCount;
1851    float * ret;
1852    uint64_t now;
1853
1854    assert( t->manager );
1855    managerLock( t->manager );
1856
1857    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1858    webseedCount = tr_ptrArraySize( &t->webseeds );
1859    assert( webseedCount == tor->info.webseedCount );
1860    ret = tr_new0( float, webseedCount );
1861    now = tr_date( );
1862
1863    for( i=0; i<webseedCount; ++i )
1864        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1865            ret[i] = -1.0;
1866
1867    managerUnlock( t->manager );
1868    return ret;
1869}
1870
1871double
1872tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1873{
1874    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1875}
1876
1877
1878struct tr_peer_stat *
1879tr_peerMgrPeerStats( const tr_torrent    * tor,
1880                     int                 * setmeCount )
1881{
1882    int i, size;
1883    const Torrent * t = tor->torrentPeers;
1884    const tr_peer ** peers;
1885    tr_peer_stat * ret;
1886    uint64_t now;
1887
1888    assert( t->manager );
1889    managerLock( t->manager );
1890
1891    size = tr_ptrArraySize( &t->peers );
1892    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1893    ret = tr_new0( tr_peer_stat, size );
1894    now = tr_date( );
1895
1896    for( i=0; i<size; ++i )
1897    {
1898        char *                   pch;
1899        const tr_peer *          peer = peers[i];
1900        const struct peer_atom * atom = peer->atom;
1901        tr_peer_stat *           stat = ret + i;
1902
1903        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
1904        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1905                   sizeof( stat->client ) );
1906        stat->port               = ntohs( peer->atom->port );
1907        stat->from               = atom->from;
1908        stat->progress           = peer->progress;
1909        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1910        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1911        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1912        stat->peerIsChoked       = peer->peerIsChoked;
1913        stat->peerIsInterested   = peer->peerIsInterested;
1914        stat->clientIsChoked     = peer->clientIsChoked;
1915        stat->clientIsInterested = peer->clientIsInterested;
1916        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1917        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1918        stat->isUploadingTo      = clientIsUploadingTo( peer );
1919        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1920
1921        pch = stat->flagStr;
1922        if( t->optimistic == peer ) *pch++ = 'O';
1923        if( stat->isDownloadingFrom ) *pch++ = 'D';
1924        else if( stat->clientIsInterested ) *pch++ = 'd';
1925        if( stat->isUploadingTo ) *pch++ = 'U';
1926        else if( stat->peerIsInterested ) *pch++ = 'u';
1927        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1928        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1929        if( stat->isEncrypted ) *pch++ = 'E';
1930        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
1931        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1932        if( stat->isIncoming ) *pch++ = 'I';
1933        *pch = '\0';
1934    }
1935
1936    *setmeCount = size;
1937
1938    managerUnlock( t->manager );
1939    return ret;
1940}
1941
1942/**
1943***
1944**/
1945
1946struct ChokeData
1947{
1948    tr_bool         doUnchoke;
1949    tr_bool         isInterested;
1950    tr_bool         isChoked;
1951    int             rate;
1952    tr_peer *       peer;
1953};
1954
1955static int
1956compareChoke( const void * va,
1957              const void * vb )
1958{
1959    const struct ChokeData * a = va;
1960    const struct ChokeData * b = vb;
1961
1962    if( a->rate != b->rate ) /* prefer higher overall speeds */
1963        return a->rate > b->rate ? -1 : 1;
1964
1965    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1966        return a->isChoked ? 1 : -1;
1967
1968    return 0;
1969}
1970
1971static int
1972isNew( const tr_peer * peer )
1973{
1974    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1975}
1976
1977static int
1978isSame( const tr_peer * peer )
1979{
1980    return peer && peer->client && strstr( peer->client, "Transmission" );
1981}
1982
1983/**
1984***
1985**/
1986
1987static void
1988rechokeTorrent( Torrent * t, const uint64_t now )
1989{
1990    int i, size, unchokedInterested;
1991    const int peerCount = tr_ptrArraySize( &t->peers );
1992    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1993    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1994    const tr_session * session = t->manager->session;
1995    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1996
1997    assert( torrentIsLocked( t ) );
1998
1999    /* sort the peers by preference and rate */
2000    for( i = 0, size = 0; i < peerCount; ++i )
2001    {
2002        tr_peer * peer = peers[i];
2003        struct peer_atom * atom = peer->atom;
2004
2005        if( peer->progress >= 1.0 ) /* choke all seeds */
2006        {
2007            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2008        }
2009        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2010        {
2011            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2012        }
2013        else if( chokeAll ) /* choke everyone if we're not uploading */
2014        {
2015            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2016        }
2017        else
2018        {
2019            struct ChokeData * n = &choke[size++];
2020            n->peer         = peer;
2021            n->isInterested = peer->peerIsInterested;
2022            n->isChoked     = peer->peerIsChoked;
2023            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2024        }
2025    }
2026
2027    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2028
2029    /**
2030     * Reciprocation and number of uploads capping is managed by unchoking
2031     * the N peers which have the best upload rate and are interested.
2032     * This maximizes the client's download rate. These N peers are
2033     * referred to as downloaders, because they are interested in downloading
2034     * from the client.
2035     *
2036     * Peers which have a better upload rate (as compared to the downloaders)
2037     * but aren't interested get unchoked. If they become interested, the
2038     * downloader with the worst upload rate gets choked. If a client has
2039     * a complete file, it uses its upload rate rather than its download
2040     * rate to decide which peers to unchoke.
2041     */
2042    unchokedInterested = 0;
2043    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2044        choke[i].doUnchoke = 1;
2045        if( choke[i].isInterested )
2046            ++unchokedInterested;
2047    }
2048
2049    /* optimistic unchoke */
2050    if( i < size )
2051    {
2052        int n;
2053        struct ChokeData * c;
2054        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2055
2056        for( ; i<size; ++i )
2057        {
2058            if( choke[i].isInterested )
2059            {
2060                const tr_peer * peer = choke[i].peer;
2061                int x = 1, y;
2062                if( isNew( peer ) ) x *= 3;
2063                if( isSame( peer ) ) x *= 3;
2064                for( y=0; y<x; ++y )
2065                    tr_ptrArrayAppend( &randPool, &choke[i] );
2066            }
2067        }
2068
2069        if(( n = tr_ptrArraySize( &randPool )))
2070        {
2071            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2072            c->doUnchoke = 1;
2073            t->optimistic = c->peer;
2074        }
2075
2076        tr_ptrArrayDestruct( &randPool, NULL );
2077    }
2078
2079    for( i=0; i<size; ++i )
2080        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2081
2082    /* cleanup */
2083    tr_free( choke );
2084}
2085
2086static int
2087rechokePulse( void * vmgr )
2088{
2089    uint64_t now;
2090    tr_torrent * tor = NULL;
2091    tr_peerMgr * mgr = vmgr;
2092    managerLock( mgr );
2093
2094    now = tr_date( );
2095    while(( tor = tr_torrentNext( mgr->session, tor )))
2096        if( tor->isRunning )
2097            rechokeTorrent( tor->torrentPeers, now );
2098
2099    managerUnlock( mgr );
2100    return TRUE;
2101}
2102
2103/***
2104****
2105****  Life and Death
2106****
2107***/
2108
2109typedef enum
2110{
2111    TR_CAN_KEEP,
2112    TR_CAN_CLOSE,
2113    TR_MUST_CLOSE,
2114}
2115tr_close_type_t;
2116
2117static tr_close_type_t
2118shouldPeerBeClosed( const Torrent    * t,
2119                    const tr_peer    * peer,
2120                    int                peerCount )
2121{
2122    const tr_torrent *       tor = t->tor;
2123    const time_t             now = time( NULL );
2124    const struct peer_atom * atom = peer->atom;
2125
2126    /* if it's marked for purging, close it */
2127    if( peer->doPurge )
2128    {
2129        tordbg( t, "purging peer %s because its doPurge flag is set",
2130                tr_atomAddrStr( atom ) );
2131        return TR_MUST_CLOSE;
2132    }
2133
2134    /* if we're seeding and the peer has everything we have,
2135     * and enough time has passed for a pex exchange, then disconnect */
2136    if( tr_torrentIsSeed( tor ) )
2137    {
2138        int peerHasEverything;
2139        if( atom->flags & ADDED_F_SEED_FLAG )
2140            peerHasEverything = TRUE;
2141        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2142            peerHasEverything = FALSE;
2143        else {
2144            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2145            tr_bitfieldDifference( tmp, peer->have );
2146            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2147            tr_bitfieldFree( tmp );
2148        }
2149
2150        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2151        {
2152            tordbg( t, "purging peer %s because we're both seeds",
2153                    tr_atomAddrStr( atom ) );
2154            return TR_MUST_CLOSE;
2155        }
2156    }
2157
2158    /* disconnect if it's been too long since piece data has been transferred.
2159     * this is on a sliding scale based on number of available peers... */
2160    {
2161        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2162        /* if we have >= relaxIfFewerThan, strictness is 100%.
2163         * if we have zero connections, strictness is 0% */
2164        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2165                               ? 1.0
2166                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2167        const int lo = MIN_UPLOAD_IDLE_SECS;
2168        const int hi = MAX_UPLOAD_IDLE_SECS;
2169        const int limit = hi - ( ( hi - lo ) * strictness );
2170        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2171/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2172        if( idleTime > limit ) {
2173            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2174                       tr_atomAddrStr( atom ), idleTime );
2175            return TR_CAN_CLOSE;
2176        }
2177    }
2178
2179    return TR_CAN_KEEP;
2180}
2181
2182static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2183
2184static tr_peer **
2185getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2186{
2187    int i, peerCount, outsize;
2188    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2189    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2190
2191    assert( torrentIsLocked( t ) );
2192
2193    for( i = outsize = 0; i < peerCount; ++i )
2194        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2195            ret[outsize++] = peers[i];
2196
2197    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2198
2199    *setmeSize = outsize;
2200    return ret;
2201}
2202
2203static int
2204compareCandidates( const void * va, const void * vb )
2205{
2206    const struct peer_atom * a = *(const struct peer_atom**) va;
2207    const struct peer_atom * b = *(const struct peer_atom**) vb;
2208
2209    /* <Charles> Here we would probably want to try reconnecting to
2210     * peers that had most recently given us data. Lots of users have
2211     * trouble with resets due to their routers and/or ISPs. This way we
2212     * can quickly recover from an unwanted reset. So we sort
2213     * piece_data_time in descending order.
2214     */
2215
2216    if( a->piece_data_time != b->piece_data_time )
2217        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2218
2219    if( a->numFails != b->numFails )
2220        return a->numFails < b->numFails ? -1 : 1;
2221
2222    if( a->time != b->time )
2223        return a->time < b->time ? -1 : 1;
2224
2225    /* In order to avoid fragmenting the swarm, peers from trackers and
2226     * from the DHT should be preferred to peers from PEX. */
2227    if( a->from != b->from )
2228        return a->from < b->from ? -1 : 1;
2229
2230    return 0;
2231}
2232
2233static int
2234getReconnectIntervalSecs( const struct peer_atom * atom )
2235{
2236    int          sec;
2237    const time_t now = time( NULL );
2238
2239    /* if we were recently connected to this peer and transferring piece
2240     * data, try to reconnect to them sooner rather that later -- we don't
2241     * want network troubles to get in the way of a good peer. */
2242    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2243        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2244
2245    /* don't allow reconnects more often than our minimum */
2246    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2247        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2248
2249    /* otherwise, the interval depends on how many times we've tried
2250     * and failed to connect to the peer */
2251    else switch( atom->numFails ) {
2252        case 0: sec = 0; break;
2253        case 1: sec = 5; break;
2254        case 2: sec = 2 * 60; break;
2255        case 3: sec = 15 * 60; break;
2256        case 4: sec = 30 * 60; break;
2257        case 5: sec = 60 * 60; break;
2258        default: sec = 120 * 60; break;
2259    }
2260
2261    return sec;
2262}
2263
2264static struct peer_atom **
2265getPeerCandidates( Torrent * t, int * setmeSize )
2266{
2267    int                 i, atomCount, retCount;
2268    struct peer_atom ** atoms;
2269    struct peer_atom ** ret;
2270    const time_t        now = time( NULL );
2271    const int           seed = tr_torrentIsSeed( t->tor );
2272
2273    assert( torrentIsLocked( t ) );
2274
2275    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2276    ret = tr_new( struct peer_atom*, atomCount );
2277    for( i = retCount = 0; i < atomCount; ++i )
2278    {
2279        int                interval;
2280        struct peer_atom * atom = atoms[i];
2281
2282        /* peer fed us too much bad data ... we only keep it around
2283         * now to weed it out in case someone sends it to us via pex */
2284        if( atom->myflags & MYFLAG_BANNED )
2285            continue;
2286
2287        /* peer was unconnectable before, so we're not going to keep trying.
2288         * this is needs a separate flag from `banned', since if they try
2289         * to connect to us later, we'll let them in */
2290        if( atom->myflags & MYFLAG_UNREACHABLE )
2291            continue;
2292
2293        /* we don't need two connections to the same peer... */
2294        if( peerIsInUse( t, &atom->addr ) )
2295            continue;
2296
2297        /* no need to connect if we're both seeds... */
2298        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2299                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2300            continue;
2301
2302        /* don't reconnect too often */
2303        interval = getReconnectIntervalSecs( atom );
2304        if( ( now - atom->time ) < interval )
2305        {
2306            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2307                    i, tr_atomAddrStr( atom ), interval );
2308            continue;
2309        }
2310
2311        /* Don't connect to peers in our blocklist */
2312        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2313            continue;
2314
2315        ret[retCount++] = atom;
2316    }
2317
2318    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2319    *setmeSize = retCount;
2320    return ret;
2321}
2322
2323static void
2324closePeer( Torrent * t, tr_peer * peer )
2325{
2326    struct peer_atom * atom;
2327
2328    assert( t != NULL );
2329    assert( peer != NULL );
2330
2331    atom = peer->atom;
2332
2333    /* if we transferred piece data, then they might be good peers,
2334       so reset their `numFails' weight to zero.  otherwise we connected
2335       to them fruitlessly, so mark it as another fail */
2336    if( atom->piece_data_time )
2337        atom->numFails = 0;
2338    else
2339        ++atom->numFails;
2340
2341    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2342    removePeer( t, peer );
2343}
2344
2345static void
2346reconnectTorrent( Torrent * t )
2347{
2348    static time_t prevTime = 0;
2349    static int    newConnectionsThisSecond = 0;
2350    time_t        now;
2351
2352    now = time( NULL );
2353    if( prevTime != now )
2354    {
2355        prevTime = now;
2356        newConnectionsThisSecond = 0;
2357    }
2358
2359    if( !t->isRunning )
2360    {
2361        removeAllPeers( t );
2362    }
2363    else
2364    {
2365        int i;
2366        int canCloseCount;
2367        int mustCloseCount;
2368        int candidateCount;
2369        int maxCandidates;
2370        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2371        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2372        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2373
2374        tordbg( t, "reconnect pulse for [%s]: "
2375                   "%d must-close connections, "
2376                   "%d can-close connections, "
2377                   "%d connection candidates, "
2378                   "%d atoms, "
2379                   "max per pulse is %d",
2380                   tr_torrentName( t->tor ),
2381                   mustCloseCount,
2382                   canCloseCount,
2383                   candidateCount,
2384                   tr_ptrArraySize( &t->pool ),
2385                   MAX_RECONNECTIONS_PER_PULSE );
2386
2387        /* disconnect the really bad peers */
2388        for( i=0; i<mustCloseCount; ++i )
2389            closePeer( t, mustClose[i] );
2390
2391        /* decide how many peers can we try to add in this pass */
2392        maxCandidates = candidateCount;
2393        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2394        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2395        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2396
2397        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2398        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2399            closePeer( t, canClose[i] );
2400
2401        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2402                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2403                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2404                   candidateCount,
2405                   MAX_RECONNECTIONS_PER_PULSE,
2406                   getPeerCount( t ),
2407                   getMaxPeerCount( t->tor ),
2408                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2409
2410        /* add some new ones */
2411        for( i=0; i<maxCandidates; ++i )
2412        {
2413            tr_peerMgr        * mgr = t->manager;
2414            struct peer_atom  * atom = candidates[i];
2415            tr_peerIo         * io;
2416
2417            tordbg( t, "Starting an OUTGOING connection with %s",
2418                   tr_atomAddrStr( atom ) );
2419
2420            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2421
2422            if( io == NULL )
2423            {
2424                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2425                        tr_atomAddrStr( atom ) );
2426                atom->myflags |= MYFLAG_UNREACHABLE;
2427            }
2428            else
2429            {
2430                tr_handshake * handshake = tr_handshakeNew( io,
2431                                                            mgr->session->encryptionMode,
2432                                                            myHandshakeDoneCB,
2433                                                            mgr );
2434
2435                assert( tr_peerIoGetTorrentHash( io ) );
2436
2437                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2438
2439                ++newConnectionsThisSecond;
2440
2441                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2442                                         handshakeCompare );
2443            }
2444
2445            atom->time = time( NULL );
2446        }
2447
2448        /* cleanup */
2449        tr_free( candidates );
2450        tr_free( mustClose );
2451        tr_free( canClose );
2452    }
2453}
2454
2455struct peer_liveliness
2456{
2457    tr_peer * peer;
2458    void * clientData;
2459    time_t pieceDataTime;
2460    time_t time;
2461    int speed;
2462    tr_bool doPurge;
2463};
2464
2465static int
2466comparePeerLiveliness( const void * va, const void * vb )
2467{
2468    const struct peer_liveliness * a = va;
2469    const struct peer_liveliness * b = vb;
2470
2471    if( a->doPurge != b->doPurge )
2472        return a->doPurge ? 1 : -1;
2473
2474    if( a->speed != b->speed ) /* faster goes first */
2475        return a->speed > b->speed ? -1 : 1;
2476
2477    /* the one to give us data more recently goes first */
2478    if( a->pieceDataTime != b->pieceDataTime )
2479        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2480
2481    /* the one we connected to most recently goes first */
2482    if( a->time != b->time )
2483        return a->time > b->time ? -1 : 1;
2484
2485    return 0;
2486}
2487
2488static int
2489comparePeerLivelinessReverse( const void * va, const void * vb )
2490{
2491    return -comparePeerLiveliness (va, vb);
2492}
2493
2494static void
2495sortPeersByLivelinessImpl( tr_peer  ** peers,
2496                           void     ** clientData,
2497                           int         n,
2498                           uint64_t    now,
2499                           int (*compare) ( const void *va, const void *vb ) )
2500{
2501    int i;
2502    struct peer_liveliness *lives, *l;
2503
2504    /* build a sortable array of peer + extra info */
2505    lives = l = tr_new0( struct peer_liveliness, n );
2506    for( i=0; i<n; ++i, ++l )
2507    {
2508        tr_peer * p = peers[i];
2509        l->peer = p;
2510        l->doPurge = p->doPurge;
2511        l->pieceDataTime = p->atom->piece_data_time;
2512        l->time = p->atom->time;
2513        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2514                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2515        if( clientData )
2516            l->clientData = clientData[i];
2517    }
2518
2519    /* sort 'em */
2520    assert( n == ( l - lives ) );
2521    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2522
2523    /* build the peer array */
2524    for( i=0, l=lives; i<n; ++i, ++l ) {
2525        peers[i] = l->peer;
2526        if( clientData )
2527            clientData[i] = l->clientData;
2528    }
2529    assert( n == ( l - lives ) );
2530
2531    /* cleanup */
2532    tr_free( lives );
2533}
2534
2535static void
2536sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2537{
2538    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2539}
2540
2541static void
2542sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2543{
2544    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2545}
2546
2547
2548static void
2549enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2550{
2551    int n = tr_ptrArraySize( &t->peers );
2552    const int max = tr_torrentGetPeerLimit( t->tor );
2553    if( n > max )
2554    {
2555        void * base = tr_ptrArrayBase( &t->peers );
2556        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2557        sortPeersByLiveliness( peers, NULL, n, now );
2558        while( n > max )
2559            closePeer( t, peers[--n] );
2560        tr_free( peers );
2561    }
2562}
2563
2564static void
2565enforceSessionPeerLimit( tr_session * session, uint64_t now )
2566{
2567    int n = 0;
2568    tr_torrent * tor = NULL;
2569    const int max = tr_sessionGetPeerLimit( session );
2570
2571    /* count the total number of peers */
2572    while(( tor = tr_torrentNext( session, tor )))
2573        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2574
2575    /* if there are too many, prune out the worst */
2576    if( n > max )
2577    {
2578        tr_peer ** peers = tr_new( tr_peer*, n );
2579        Torrent ** torrents = tr_new( Torrent*, n );
2580
2581        /* populate the peer array */
2582        n = 0;
2583        tor = NULL;
2584        while(( tor = tr_torrentNext( session, tor ))) {
2585            int i;
2586            Torrent * t = tor->torrentPeers;
2587            const int tn = tr_ptrArraySize( &t->peers );
2588            for( i=0; i<tn; ++i, ++n ) {
2589                peers[n] = tr_ptrArrayNth( &t->peers, i );
2590                torrents[n] = t;
2591            }
2592        }
2593
2594        /* sort 'em */
2595        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2596
2597        /* cull out the crappiest */
2598        while( n-- > max )
2599            closePeer( torrents[n], peers[n] );
2600
2601        /* cleanup */
2602        tr_free( torrents );
2603        tr_free( peers );
2604    }
2605}
2606
2607
2608static int
2609reconnectPulse( void * vmgr )
2610{
2611    tr_torrent * tor;
2612    tr_peerMgr * mgr = vmgr;
2613    uint64_t now;
2614    managerLock( mgr );
2615
2616    now = tr_date( );
2617
2618    /* if we're over the per-torrent peer limits, cull some peers */
2619    tor = NULL;
2620    while(( tor = tr_torrentNext( mgr->session, tor )))
2621        if( tor->isRunning )
2622            enforceTorrentPeerLimit( tor->torrentPeers, now );
2623
2624    /* if we're over the per-session peer limits, cull some peers */
2625    enforceSessionPeerLimit( mgr->session, now );
2626
2627    tor = NULL;
2628    while(( tor = tr_torrentNext( mgr->session, tor )))
2629        if( tor->isRunning )
2630            reconnectTorrent( tor->torrentPeers );
2631
2632    managerUnlock( mgr );
2633    return TRUE;
2634}
2635
2636/****
2637*****
2638*****  BANDWIDTH ALLOCATION
2639*****
2640****/
2641
2642static void
2643pumpAllPeers( tr_peerMgr * mgr )
2644{
2645    tr_torrent * tor = NULL;
2646
2647    while(( tor = tr_torrentNext( mgr->session, tor )))
2648    {
2649        int j;
2650        Torrent * t = tor->torrentPeers;
2651
2652        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2653        {
2654            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2655            tr_peerMsgsPulse( peer->msgs );
2656        }
2657    }
2658}
2659
2660static int
2661bandwidthPulse( void * vmgr )
2662{
2663    tr_torrent * tor = NULL;
2664    tr_peerMgr * mgr = vmgr;
2665    managerLock( mgr );
2666
2667    /* FIXME: this next line probably isn't necessary... */
2668    pumpAllPeers( mgr );
2669
2670    /* allocate bandwidth to the peers */
2671    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2672    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2673
2674    /* possibly stop torrents that have seeded enough */
2675    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2676        if( tor->needsSeedRatioCheck ) {
2677            tor->needsSeedRatioCheck = FALSE;
2678            tr_torrentCheckSeedRatio( tor );
2679        }
2680    }
2681
2682    /* possibly stop torrents that have an error */
2683    tor = NULL;
2684    while(( tor = tr_torrentNext( mgr->session, tor )))
2685        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2686            tr_torrentStop( tor );
2687
2688    managerUnlock( mgr );
2689    return TRUE;
2690}
Note: See TracBrowser for help on using the repository browser.