source: trunk/libtransmission/peer-mgr.c @ 7289

Last change on this file since 7289 was 7289, checked in by charles, 12 years ago

(libT) minor cleanup: using tr_bool, fixing up bad code indentation, using %zu on size_t instead of casting to int...

  • Property svn:keywords set to Date Rev Author Id
File size: 64.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7289 2008-12-05 22:56:19Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/* We keep one of these for every peer we know about, whether
100 * it's connected or not, so the struct must be small.
101 * When our current connections underperform, we dip back
102 * into this list for new ones. */
103struct peer_atom
104{
105    uint8_t     from;
106    uint8_t     flags;       /* these match the added_f flags */
107    uint8_t     myflags;     /* flags that aren't defined in added_f */
108    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
109    uint8_t     partialSeed;
110    tr_port     port;
111    uint16_t    numFails;
112    tr_address  addr;
113    time_t      time;        /* when the peer's connection status last changed */
114    time_t      piece_data_time;
115};
116
117typedef struct
118{
119    tr_bool         isRunning;
120
121    uint8_t         hash[SHA_DIGEST_LENGTH];
122    int         *   pendingRequestCount;
123    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
124    tr_ptrArray *   pool; /* struct peer_atom */
125    tr_ptrArray *   peers; /* tr_peer */
126    tr_ptrArray *   webseeds; /* tr_webseed */
127    tr_timer *      reconnectTimer;
128    tr_timer *      rechokeTimer;
129    tr_timer *      refillTimer;
130    tr_torrent *    tor;
131    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
132
133    struct tr_peerMgr * manager;
134}
135Torrent;
136
137struct tr_peerMgr
138{
139    tr_session      * session;
140    tr_ptrArray     * torrents; /* Torrent */
141    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
142    tr_timer        * bandwidthTimer;
143};
144
145#define tordbg( t, ... ) \
146    do { \
147        if( tr_deepLoggingIsActive( ) ) \
148            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
149    } while( 0 )
150
151#define dbgmsg( ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
155    } while( 0 )
156
157/**
158***
159**/
160
161static void
162managerLock( const struct tr_peerMgr * manager )
163{
164    tr_globalLock( manager->session );
165}
166
167static void
168managerUnlock( const struct tr_peerMgr * manager )
169{
170    tr_globalUnlock( manager->session );
171}
172
173static void
174torrentLock( Torrent * torrent )
175{
176    managerLock( torrent->manager );
177}
178
179static void
180torrentUnlock( Torrent * torrent )
181{
182    managerUnlock( torrent->manager );
183}
184
185static int
186torrentIsLocked( const Torrent * t )
187{
188    return tr_globalIsLocked( t->manager->session );
189}
190
191/**
192***
193**/
194
195static int
196handshakeCompareToAddr( const void * va,
197                        const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a,
206                  const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray      * handshakes,
213                      const tr_address * addr )
214{
215    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
216}
217
218static int
219comparePeerAtomToAddress( const void * va, const void * vb )
220{
221    const struct peer_atom * a = va;
222
223    return tr_compareAddresses( &a->addr, vb );
224}
225
226static int
227comparePeerAtoms( const void * va, const void * vb )
228{
229    const struct peer_atom * b = vb;
230
231    return comparePeerAtomToAddress( va, &b->addr );
232}
233
234/**
235***
236**/
237
238static int
239torrentCompare( const void * va,
240                const void * vb )
241{
242    const Torrent * a = va;
243    const Torrent * b = vb;
244
245    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
246}
247
248static int
249torrentCompareToHash( const void * va,
250                      const void * vb )
251{
252    const Torrent * a = va;
253    const uint8_t * b_hash = vb;
254
255    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
256}
257
258static Torrent*
259getExistingTorrent( tr_peerMgr *    manager,
260                    const uint8_t * hash )
261{
262    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
263                                             hash,
264                                             torrentCompareToHash );
265}
266
267static int
268peerCompare( const void * va, const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272
273    return tr_compareAddresses( &a->addr, &b->addr );
274}
275
276static int
277peerCompareToAddr( const void * va, const void * vb )
278{
279    const tr_peer * a = va;
280
281    return tr_compareAddresses( &a->addr, vb );
282}
283
284static tr_peer*
285getExistingPeer( Torrent          * torrent,
286                 const tr_address * addr )
287{
288    assert( torrentIsLocked( torrent ) );
289    assert( addr );
290
291    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
292}
293
294static struct peer_atom*
295getExistingAtom( const Torrent    * t,
296                 const tr_address * addr )
297{
298    assert( torrentIsLocked( t ) );
299    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
300}
301
302static tr_bool
303peerIsInUse( const Torrent    * ct,
304             const tr_address * addr )
305{
306    Torrent * t = (Torrent*) ct;
307
308    assert( torrentIsLocked ( t ) );
309
310    return getExistingPeer( t, addr )
311        || getExistingHandshake( t->outgoingHandshakes, addr )
312        || getExistingHandshake( t->manager->incomingHandshakes, addr );
313}
314
315static tr_peer*
316peerConstructor( tr_torrent * tor, const tr_address * addr )
317{
318    tr_peer * p;
319    p = tr_new0( tr_peer, 1 );
320    p->addr = *addr;
321    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
322    return p;
323}
324
325static tr_peer*
326getPeer( Torrent          * torrent,
327         const tr_address * addr )
328{
329    tr_peer * peer;
330
331    assert( torrentIsLocked( torrent ) );
332
333    peer = getExistingPeer( torrent, addr );
334
335    if( peer == NULL )
336    {
337        peer = peerConstructor( torrent->tor, addr );
338        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
339    }
340
341    return peer;
342}
343
344static void
345peerDestructor( tr_peer * peer )
346{
347    assert( peer );
348
349    if( peer->msgs != NULL )
350    {
351        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
352        tr_peerMsgsFree( peer->msgs );
353    }
354
355    tr_peerIoFree( peer->io );
356
357    tr_bitfieldFree( peer->have );
358    tr_bitfieldFree( peer->blame );
359    tr_free( peer->client );
360
361    tr_bandwidthFree( peer->bandwidth );
362
363    tr_free( peer );
364}
365
366static void
367removePeer( Torrent * t,
368            tr_peer * peer )
369{
370    tr_peer *          removed;
371    struct peer_atom * atom;
372
373    assert( torrentIsLocked( t ) );
374
375    atom = getExistingAtom( t, &peer->addr );
376    assert( atom );
377    atom->time = time( NULL );
378
379    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
380    assert( removed == peer );
381    peerDestructor( removed );
382}
383
384static void
385removeAllPeers( Torrent * t )
386{
387    while( !tr_ptrArrayEmpty( t->peers ) )
388        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
389}
390
391static void
392torrentDestructor( void * vt )
393{
394    Torrent * t = vt;
395    uint8_t   hash[SHA_DIGEST_LENGTH];
396
397    assert( t );
398    assert( !t->isRunning );
399    assert( t->peers );
400    assert( torrentIsLocked( t ) );
401    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
402    assert( tr_ptrArrayEmpty( t->peers ) );
403
404    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
405
406    tr_timerFree( &t->reconnectTimer );
407    tr_timerFree( &t->rechokeTimer );
408    tr_timerFree( &t->refillTimer );
409
410    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
411    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
412    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
413    tr_ptrArrayFree( t->peers, NULL );
414
415    tr_free( t->pendingRequestCount );
416    tr_free( t );
417}
418
419static void peerCallbackFunc( void * vpeer,
420                              void * vevent,
421                              void * vt );
422
423static Torrent*
424torrentConstructor( tr_peerMgr * manager,
425                    tr_torrent * tor )
426{
427    int       i;
428    Torrent * t;
429
430    t = tr_new0( Torrent, 1 );
431    t->manager = manager;
432    t->tor = tor;
433    t->pool = tr_ptrArrayNew( );
434    t->peers = tr_ptrArrayNew( );
435    t->webseeds = tr_ptrArrayNew( );
436    t->outgoingHandshakes = tr_ptrArrayNew( );
437    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
438
439    for( i = 0; i < tor->info.webseedCount; ++i )
440    {
441        tr_webseed * w =
442            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
443                           t );
444        tr_ptrArrayAppend( t->webseeds, w );
445    }
446
447    return t;
448}
449
450
451static int bandwidthPulse( void * vmgr );
452
453
454tr_peerMgr*
455tr_peerMgrNew( tr_session * session )
456{
457    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
458
459    m->session = session;
460    m->torrents = tr_ptrArrayNew( );
461    m->incomingHandshakes = tr_ptrArrayNew( );
462    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
463    return m;
464}
465
466void
467tr_peerMgrFree( tr_peerMgr * manager )
468{
469    managerLock( manager );
470
471    tr_timerFree( &manager->bandwidthTimer );
472
473    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
474     * the item from manager->handshakes, so this is a little roundabout... */
475    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
476        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
477
478    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
479
480    /* free the torrents. */
481    tr_ptrArrayFree( manager->torrents, torrentDestructor );
482
483    managerUnlock( manager );
484    tr_free( manager );
485}
486
487static tr_peer**
488getConnectedPeers( Torrent * t, int * setmeCount )
489{
490    int i, peerCount, connectionCount;
491    tr_peer **peers;
492    tr_peer **ret;
493
494    assert( torrentIsLocked( t ) );
495
496    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
497    ret = tr_new( tr_peer *, peerCount );
498
499    for( i = connectionCount = 0; i < peerCount; ++i )
500        if( peers[i]->msgs )
501            ret[connectionCount++] = peers[i];
502
503    *setmeCount = connectionCount;
504    return ret;
505}
506
507static int
508clientIsDownloadingFrom( const tr_peer * peer )
509{
510    return peer->clientIsInterested && !peer->clientIsChoked;
511}
512
513static int
514clientIsUploadingTo( const tr_peer * peer )
515{
516    return peer->peerIsInterested && !peer->peerIsChoked;
517}
518
519/***
520****
521***/
522
523tr_bool
524tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
525                      const uint8_t      * torrentHash,
526                      const tr_address   * addr )
527{
528    tr_bool isSeed = FALSE;
529    const Torrent * t = NULL;
530    const struct peer_atom * atom = NULL;
531
532    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
533    if( t )
534        atom = getExistingAtom( t, addr );
535    if( atom )
536        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
537
538    return isSeed;
539}
540
541/****
542*****
543*****  REFILL
544*****
545****/
546
547static void
548assertValidPiece( Torrent * t, tr_piece_index_t piece )
549{
550    assert( t );
551    assert( t->tor );
552    assert( piece < t->tor->info.pieceCount );
553}
554
555static int
556getPieceRequests( Torrent * t, tr_piece_index_t piece )
557{
558    assertValidPiece( t, piece );
559
560    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
561}
562
563static void
564incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
565{
566    assertValidPiece( t, piece );
567
568    if( t->pendingRequestCount == NULL )
569        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
570    t->pendingRequestCount[piece]++;
571}
572
573static void
574decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
575{
576    assertValidPiece( t, piece );
577
578    if( t->pendingRequestCount )
579        t->pendingRequestCount[piece]--;
580}
581
582struct tr_refill_piece
583{
584    tr_priority_t    priority;
585    uint32_t         piece;
586    uint32_t         peerCount;
587    int              random;
588    int              pendingRequestCount;
589    int              missingBlockCount;
590};
591
592static int
593compareRefillPiece( const void * aIn, const void * bIn )
594{
595    const struct tr_refill_piece * a = aIn;
596    const struct tr_refill_piece * b = bIn;
597
598    /* if one piece has a higher priority, it goes first */
599    if( a->priority != b->priority )
600        return a->priority > b->priority ? -1 : 1;
601
602    /* have a per-priority endgame */
603    if( a->pendingRequestCount != b->pendingRequestCount )
604        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
605
606    /* fewer missing pieces goes first */
607    if( a->missingBlockCount != b->missingBlockCount )
608        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
609
610    /* otherwise if one has fewer peers, it goes first */
611    if( a->peerCount != b->peerCount )
612        return a->peerCount < b->peerCount ? -1 : 1;
613
614    /* otherwise go with our random seed */
615    if( a->random != b->random )
616        return a->random < b->random ? -1 : 1;
617
618    return 0;
619}
620
621static tr_piece_index_t *
622getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
623{
624    const tr_torrent  * tor = t->tor;
625    const tr_info     * inf = &tor->info;
626    tr_piece_index_t    i;
627    tr_piece_index_t    poolSize = 0;
628    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
629    int                 peerCount;
630    tr_peer**           peers;
631
632    assert( torrentIsLocked( t ) );
633
634    peers = getConnectedPeers( t, &peerCount );
635
636    /* make a list of the pieces that we want but don't have */
637    for( i = 0; i < inf->pieceCount; ++i )
638        if( !tor->info.pieces[i].dnd
639                && !tr_cpPieceIsComplete( tor->completion, i ) )
640            pool[poolSize++] = i;
641
642    /* sort the pool by which to request next */
643    if( poolSize > 1 )
644    {
645        tr_piece_index_t j;
646        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
647
648        for( j = 0; j < poolSize; ++j )
649        {
650            int k;
651            const tr_piece_index_t piece = pool[j];
652            struct tr_refill_piece * setme = p + j;
653
654            setme->piece = piece;
655            setme->priority = inf->pieces[piece].priority;
656            setme->peerCount = 0;
657            setme->random = tr_cryptoWeakRandInt( INT_MAX );
658            setme->pendingRequestCount = getPieceRequests( t, piece );
659            setme->missingBlockCount
660                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
661
662            for( k = 0; k < peerCount; ++k )
663            {
664                const tr_peer * peer = peers[k];
665                if( peer->peerIsInterested
666                        && !peer->clientIsChoked
667                        && tr_bitfieldHas( peer->have, piece ) )
668                    ++setme->peerCount;
669            }
670        }
671
672        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
673               compareRefillPiece );
674
675        for( j = 0; j < poolSize; ++j )
676            pool[j] = p[j].piece;
677
678        tr_free( p );
679    }
680
681    tr_free( peers );
682
683    *pieceCount = poolSize;
684    return pool;
685}
686
687struct tr_blockIterator
688{
689    Torrent * t;
690    tr_block_index_t blockIndex, blockCount, *blocks;
691    tr_piece_index_t pieceIndex, pieceCount, *pieces;
692};
693
694static struct tr_blockIterator*
695blockIteratorNew( Torrent * t )
696{
697    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
698    i->t = t;
699    i->pieces = getPreferredPieces( t, &i->pieceCount );
700    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
701    return i;
702}
703
704static int
705blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
706{
707    int found;
708    Torrent * t = i->t;
709    tr_torrent * tor = t->tor;
710
711    while( ( i->blockIndex == i->blockCount )
712        && ( i->pieceIndex < i->pieceCount ) )
713    {
714        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
715        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
716        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
717        tr_block_index_t block;
718
719        assert( index < tor->info.pieceCount );
720
721        i->blockCount = 0;
722        i->blockIndex = 0;
723        for( block=b; block!=e; ++block )
724            if( !tr_cpBlockIsComplete( tor->completion, block ) )
725                i->blocks[i->blockCount++] = block;
726    }
727
728    if(( found = ( i->blockIndex < i->blockCount )))
729        *setme = i->blocks[i->blockIndex++];
730
731    return found;
732}
733
734static void
735blockIteratorFree( struct tr_blockIterator * i )
736{
737    tr_free( i->blocks );
738    tr_free( i->pieces );
739    tr_free( i );
740}
741
742static tr_peer**
743getPeersUploadingToClient( Torrent * t,
744                           int *     setmeCount )
745{
746    int j;
747    int peerCount = 0;
748    int retCount = 0;
749    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
750    tr_peer ** ret = tr_new( tr_peer *, peerCount );
751
752    j = 0; /* this is a temporary test to make sure we walk through all the peers */
753    if( peerCount )
754    {
755        /* Get a list of peers we're downloading from.
756           Pick a different starting point each time so all peers
757           get a chance at being the first in line */
758        const int fencepost = tr_cryptoWeakRandInt( peerCount );
759        int i = fencepost;
760        do {
761            if( clientIsDownloadingFrom( peers[i] ) )
762                ret[retCount++] = peers[i];
763            i = ( i + 1 ) % peerCount;
764            ++j;
765        } while( i != fencepost );
766    }
767    assert( j == peerCount );
768    *setmeCount = retCount;
769    return ret;
770}
771
772static uint32_t
773getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
774{
775    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
776    const uint64_t blockPos = tor->blockSize * b;
777    assert( blockPos >= piecePos );
778    return (uint32_t)( blockPos - piecePos );
779}
780
781static int
782refillPulse( void * vtorrent )
783{
784    tr_block_index_t block;
785    int peerCount;
786    int webseedCount;
787    tr_peer ** peers;
788    tr_webseed ** webseeds;
789    struct tr_blockIterator * blockIterator;
790    Torrent * t = vtorrent;
791    tr_torrent * tor = t->tor;
792
793    if( !t->isRunning )
794        return TRUE;
795    if( tr_torrentIsSeed( t->tor ) )
796        return TRUE;
797
798    torrentLock( t );
799    tordbg( t, "Refilling Request Buffers..." );
800
801    blockIterator = blockIteratorNew( t );
802    peers = getPeersUploadingToClient( t, &peerCount );
803    webseedCount = tr_ptrArraySize( t->webseeds );
804    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
805                          webseedCount * sizeof( tr_webseed* ) );
806
807    while( ( webseedCount || peerCount )
808        && blockIteratorNext( blockIterator, &block ) )
809    {
810        int j;
811        int handled = FALSE;
812
813        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
814        const uint32_t offset = getBlockOffsetInPiece( tor, block );
815        const uint32_t length = tr_torBlockCountBytes( tor, block );
816
817        /* find a peer who can ask for this block */
818        for( j=0; !handled && j<peerCount; )
819        {
820            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
821            switch( val )
822            {
823                case TR_ADDREQ_FULL:
824                case TR_ADDREQ_CLIENT_CHOKED:
825                    peers[j] = peers[--peerCount];
826                    break;
827
828                case TR_ADDREQ_MISSING:
829                case TR_ADDREQ_DUPLICATE:
830                    ++j;
831                    break;
832
833                case TR_ADDREQ_OK:
834                    incrementPieceRequests( t, index );
835                    handled = TRUE;
836                    break;
837
838                default:
839                    assert( 0 && "unhandled value" );
840                    break;
841            }
842        }
843
844        /* maybe one of the webseeds can do it */
845        for( j=0; !handled && j<webseedCount; )
846        {
847            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
848                                                          index, offset, length );
849            switch( val )
850            {
851                case TR_ADDREQ_FULL:
852                    webseeds[j] = webseeds[--webseedCount];
853                    break;
854
855                case TR_ADDREQ_OK:
856                    incrementPieceRequests( t, index );
857                    handled = TRUE;
858                    break;
859
860                default:
861                    assert( 0 && "unhandled value" );
862                    break;
863            }
864        }
865    }
866
867    /* cleanup */
868    blockIteratorFree( blockIterator );
869    tr_free( webseeds );
870    tr_free( peers );
871
872    t->refillTimer = NULL;
873    torrentUnlock( t );
874    return FALSE;
875}
876
877static void
878broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
879{
880    int i, size;
881    tr_peer ** peers;
882
883    assert( torrentIsLocked( t ) );
884
885    peers = getConnectedPeers( t, &size );
886    for( i=0; i<size; ++i )
887        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
888    tr_free( peers );
889}
890
891static void
892addStrike( Torrent * t,
893           tr_peer * peer )
894{
895    tordbg( t, "increasing peer %s strike count to %d",
896            tr_peerIoAddrStr( &peer->addr,
897                              peer->port ), peer->strikes + 1 );
898
899    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
900    {
901        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
902        atom->myflags |= MYFLAG_BANNED;
903        peer->doPurge = 1;
904        tordbg( t, "banning peer %s",
905               tr_peerIoAddrStr( &atom->addr, atom->port ) );
906    }
907}
908
909static void
910gotBadPiece( Torrent *        t,
911             tr_piece_index_t pieceIndex )
912{
913    tr_torrent *   tor = t->tor;
914    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
915
916    tor->corruptCur += byteCount;
917    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
918}
919
920static void
921refillSoon( Torrent * t )
922{
923    if( t->refillTimer == NULL )
924        t->refillTimer = tr_timerNew( t->manager->session,
925                                      refillPulse, t,
926                                      REFILL_PERIOD_MSEC );
927}
928
929static void
930peerSuggestedPiece( Torrent            * t,
931                    tr_peer            * peer,
932                    tr_piece_index_t     pieceIndex,
933                    int                  isFastAllowed )
934{
935    assert( t );
936    assert( peer );
937    assert( peer->msgs );
938
939    /* is this a valid piece? */
940    if(  pieceIndex >= t->tor->info.pieceCount )
941        return;
942
943    /* don't ask for it if we've already got it */
944    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
945        return;
946
947    /* don't ask for it if they don't have it */
948    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
949        return;
950
951    /* don't ask for it if we're choked and it's not fast */
952    if( !isFastAllowed && peer->clientIsChoked )
953        return;
954
955    /* request the blocks that we don't have in this piece */
956    {
957        tr_block_index_t block;
958        const tr_torrent * tor = t->tor;
959        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
960        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
961
962        for( block=start; block<end; ++block )
963        {
964            if( !tr_cpBlockIsComplete( tor->completion, block ) )
965            {
966                const uint32_t offset = getBlockOffsetInPiece( tor, block );
967                const uint32_t length = tr_torBlockCountBytes( tor, block );
968                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
969                incrementPieceRequests( t, pieceIndex );
970            }
971        }
972    }
973}
974
975static void
976peerCallbackFunc( void * vpeer,
977                  void * vevent,
978                  void * vt )
979{
980    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
981    Torrent             * t = vt;
982    const tr_peer_event * e = vevent;
983
984    torrentLock( t );
985
986    switch( e->eventType )
987    {
988        case TR_PEER_UPLOAD_ONLY:
989            /* update our atom */
990            if( peer ) {
991                struct peer_atom * a = getExistingAtom( t, &peer->addr );
992                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
993            }
994            break;
995
996        case TR_PEER_NEED_REQ:
997            refillSoon( t );
998            break;
999
1000        case TR_PEER_CANCEL:
1001            decrementPieceRequests( t, e->pieceIndex );
1002            break;
1003
1004        case TR_PEER_PEER_GOT_DATA:
1005        {
1006            const time_t now = time( NULL );
1007            tr_torrent * tor = t->tor;
1008
1009            tor->activityDate = now;
1010
1011            if( e->wasPieceData )
1012                tor->uploadedCur += e->length;
1013
1014            /* update the stats */
1015            if( e->wasPieceData )
1016                tr_statsAddUploaded( tor->session, e->length );
1017
1018            /* update our atom */
1019            if( peer ) {
1020                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1021                a->piece_data_time = now;
1022            }
1023
1024            break;
1025        }
1026
1027        case TR_PEER_CLIENT_GOT_SUGGEST:
1028            if( peer )
1029                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1030            break;
1031
1032        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1033            if( peer )
1034                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1035            break;
1036
1037        case TR_PEER_CLIENT_GOT_DATA:
1038        {
1039            const time_t now = time( NULL );
1040            tr_torrent * tor = t->tor;
1041            tor->activityDate = now;
1042
1043            /* only add this to downloadedCur if we got it from a peer --
1044             * webseeds shouldn't count against our ratio.  As one tracker
1045             * admin put it, "Those pieces are downloaded directly from the
1046             * content distributor, not the peers, it is the tracker's job
1047             * to manage the swarms, not the web server and does not fit
1048             * into the jurisdiction of the tracker." */
1049            if( peer && e->wasPieceData )
1050                tor->downloadedCur += e->length;
1051
1052            /* update the stats */ 
1053            if( e->wasPieceData )
1054                tr_statsAddDownloaded( tor->session, e->length );
1055
1056            /* update our atom */
1057            if( peer ) {
1058                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1059                a->piece_data_time = now;
1060            }
1061
1062            break;
1063        }
1064
1065        case TR_PEER_PEER_PROGRESS:
1066        {
1067            if( peer )
1068            {
1069                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1070                const int peerIsSeed = e->progress >= 1.0;
1071                if( peerIsSeed ) {
1072                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1073                    atom->flags |= ADDED_F_SEED_FLAG;
1074                } else {
1075                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1076                    atom->flags &= ~ADDED_F_SEED_FLAG;
1077                }
1078            }
1079            break;
1080        }
1081
1082        case TR_PEER_CLIENT_GOT_BLOCK:
1083        {
1084            tr_torrent *     tor = t->tor;
1085
1086            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1087
1088            tr_cpBlockAdd( tor->completion, block );
1089            decrementPieceRequests( t, e->pieceIndex );
1090
1091            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1092
1093            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1094            {
1095                const tr_piece_index_t p = e->pieceIndex;
1096                const tr_bool ok = tr_ioTestPiece( tor, p );
1097
1098                if( !ok )
1099                {
1100                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1101                               (unsigned long)p );
1102                }
1103
1104                tr_torrentSetHasPiece( tor, p, ok );
1105                tr_torrentSetPieceChecked( tor, p, TRUE );
1106                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1107
1108                if( !ok )
1109                    gotBadPiece( t, p );
1110                else
1111                {
1112                    int i, peerCount;
1113                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1114                    for( i = 0; i < peerCount; ++i )
1115                        tr_peerMsgsHave( peers[i]->msgs, p );
1116                    tr_free( peers );
1117                }
1118
1119                tr_torrentRecheckCompleteness( tor );
1120            }
1121            break;
1122        }
1123
1124        case TR_PEER_ERROR:
1125            if( e->err == EINVAL )
1126            {
1127                addStrike( t, peer );
1128                peer->doPurge = 1;
1129            }
1130            else if( ( e->err == ERANGE )
1131                  || ( e->err == EMSGSIZE )
1132                  || ( e->err == ENOTCONN ) )
1133            {
1134                /* some protocol error from the peer */
1135                peer->doPurge = 1;
1136            }
1137            else /* a local error, such as an IO error */
1138            {
1139                t->tor->error = e->err;
1140                tr_strlcpy( t->tor->errorString,
1141                            tr_strerror( t->tor->error ),
1142                            sizeof( t->tor->errorString ) );
1143                tr_torrentStop( t->tor );
1144            }
1145            break;
1146
1147        default:
1148            assert( 0 );
1149    }
1150
1151    torrentUnlock( t );
1152}
1153
1154static void
1155ensureAtomExists( Torrent          * t,
1156                  const tr_address * addr,
1157                  tr_port            port,
1158                  uint8_t            flags,
1159                  uint8_t            from )
1160{
1161    if( getExistingAtom( t, addr ) == NULL )
1162    {
1163        struct peer_atom * a;
1164        a = tr_new0( struct peer_atom, 1 );
1165        a->addr = *addr;
1166        a->port = port;
1167        a->flags = flags;
1168        a->from = from;
1169        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1170        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1171    }
1172}
1173
1174static int
1175getMaxPeerCount( const tr_torrent * tor )
1176{
1177    return tor->maxConnectedPeers;
1178}
1179
1180static int
1181getPeerCount( const Torrent * t )
1182{
1183    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1184}
1185
1186/* FIXME: this is kind of a mess. */
1187static tr_bool
1188myHandshakeDoneCB( tr_handshake *  handshake,
1189                   tr_peerIo *     io,
1190                   int             isConnected,
1191                   const uint8_t * peer_id,
1192                   void *          vmanager )
1193{
1194    tr_bool            ok = isConnected;
1195    tr_bool            success = FALSE;
1196    tr_port            port;
1197    const tr_address * addr;
1198    tr_peerMgr       * manager = vmanager;
1199    Torrent          * t;
1200    tr_handshake     * ours;
1201
1202    assert( io );
1203    assert( isConnected == 0 || isConnected == 1 );
1204
1205    t = tr_peerIoHasTorrentHash( io )
1206        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1207        : NULL;
1208
1209    if( tr_peerIoIsIncoming ( io ) )
1210        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1211                                        handshake, handshakeCompare );
1212    else if( t )
1213        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1214                                        handshake, handshakeCompare );
1215    else
1216        ours = handshake;
1217
1218    assert( ours );
1219    assert( ours == handshake );
1220
1221    if( t )
1222        torrentLock( t );
1223
1224    addr = tr_peerIoGetAddress( io, &port );
1225
1226    if( !ok || !t || !t->isRunning )
1227    {
1228        if( t )
1229        {
1230            struct peer_atom * atom = getExistingAtom( t, addr );
1231            if( atom )
1232                ++atom->numFails;
1233        }
1234
1235        tr_peerIoFree( io );
1236    }
1237    else /* looking good */
1238    {
1239        struct peer_atom * atom;
1240        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1241        atom = getExistingAtom( t, addr );
1242        atom->time = time( NULL );
1243
1244        if( atom->myflags & MYFLAG_BANNED )
1245        {
1246            tordbg( t, "banned peer %s tried to reconnect",
1247                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1248            tr_peerIoFree( io );
1249        }
1250        else if( tr_peerIoIsIncoming( io )
1251               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1252
1253        {
1254            tr_peerIoFree( io );
1255        }
1256        else
1257        {
1258            tr_peer * peer = getExistingPeer( t, addr );
1259
1260            if( peer ) /* we already have this peer */
1261            {
1262                tr_peerIoFree( io );
1263            }
1264            else
1265            {
1266                peer = getPeer( t, addr );
1267                tr_free( peer->client );
1268
1269                if( !peer_id )
1270                    peer->client = NULL;
1271                else {
1272                    char client[128];
1273                    tr_clientForId( client, sizeof( client ), peer_id );
1274                    peer->client = tr_strdup( client );
1275                }
1276
1277                peer->port = port;
1278                peer->io = io;
1279                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1280                tr_peerIoSetBandwidth( io, peer->bandwidth );
1281
1282                success = TRUE;
1283            }
1284        }
1285    }
1286
1287    if( t )
1288        torrentUnlock( t );
1289
1290    return success;
1291}
1292
1293void
1294tr_peerMgrAddIncoming( tr_peerMgr * manager,
1295                       tr_address * addr,
1296                       tr_port      port,
1297                       int          socket )
1298{
1299    managerLock( manager );
1300
1301    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1302    {
1303        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1304        tr_netClose( socket );
1305    }
1306    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1307    {
1308        tr_netClose( socket );
1309    }
1310    else /* we don't have a connetion to them yet... */
1311    {
1312        tr_peerIo *    io;
1313        tr_handshake * handshake;
1314
1315        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1316
1317        handshake = tr_handshakeNew( io,
1318                                     manager->session->encryptionMode,
1319                                     myHandshakeDoneCB,
1320                                     manager );
1321
1322        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1323                                 handshakeCompare );
1324    }
1325
1326    managerUnlock( manager );
1327}
1328
1329void
1330tr_peerMgrAddPex( tr_peerMgr *    manager,
1331                  const uint8_t * torrentHash,
1332                  uint8_t         from,
1333                  const tr_pex *  pex )
1334{
1335    Torrent * t;
1336
1337    managerLock( manager );
1338
1339    t = getExistingTorrent( manager, torrentHash );
1340    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1341        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1342
1343    managerUnlock( manager );
1344}
1345
1346tr_pex *
1347tr_peerMgrCompactToPex( const void *    compact,
1348                        size_t          compactLen,
1349                        const uint8_t * added_f,
1350                        size_t          added_f_len,
1351                        size_t *        pexCount )
1352{
1353    size_t          i;
1354    size_t          n = compactLen / 6;
1355    const uint8_t * walk = compact;
1356    tr_pex *        pex = tr_new0( tr_pex, n );
1357
1358    for( i = 0; i < n; ++i )
1359    {
1360        pex[i].addr.type = TR_AF_INET;
1361        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1362        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1363        if( added_f && ( n == added_f_len ) )
1364            pex[i].flags = added_f[i];
1365    }
1366
1367    *pexCount = n;
1368    return pex;
1369}
1370
1371/**
1372***
1373**/
1374
1375void
1376tr_peerMgrSetBlame( tr_peerMgr *     manager,
1377                    const uint8_t *  torrentHash,
1378                    tr_piece_index_t pieceIndex,
1379                    int              success )
1380{
1381    if( !success )
1382    {
1383        int        peerCount, i;
1384        Torrent *  t = getExistingTorrent( manager, torrentHash );
1385        tr_peer ** peers;
1386
1387        assert( torrentIsLocked( t ) );
1388
1389        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1390        for( i = 0; i < peerCount; ++i )
1391        {
1392            tr_peer * peer = peers[i];
1393            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1394            {
1395                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1396                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1397                        pieceIndex, (int)peer->strikes + 1 );
1398                addStrike( t, peer );
1399            }
1400        }
1401    }
1402}
1403
1404int
1405tr_pexCompare( const void * va, const void * vb )
1406{
1407    const tr_pex * a = va;
1408    const tr_pex * b = vb;
1409    int i = tr_compareAddresses( &a->addr, &b->addr );
1410
1411    if( i ) return i;
1412    if( a->port < b->port ) return -1;
1413    if( a->port > b->port ) return 1;
1414    return 0;
1415}
1416
1417static int
1418peerPrefersCrypto( const tr_peer * peer )
1419{
1420    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1421        return TRUE;
1422
1423    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1424        return FALSE;
1425
1426    return tr_peerIoIsEncrypted( peer->io );
1427}
1428
1429int
1430tr_peerMgrGetPeers( tr_peerMgr *    manager,
1431                    const uint8_t * torrentHash,
1432                    tr_pex **       setme_pex )
1433{
1434    int peerCount = 0;
1435    const Torrent *  t;
1436
1437    managerLock( manager );
1438
1439    t = getExistingTorrent( manager, torrentHash );
1440    if( t == NULL )
1441    {
1442        *setme_pex = NULL;
1443    }
1444    else
1445    {
1446        int i;
1447        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1448        tr_pex * pex = tr_new( tr_pex, peerCount );
1449        tr_pex * walk = pex;
1450
1451        for( i=0; i<peerCount; ++i, ++walk )
1452        {
1453            const tr_peer * peer = peers[i];
1454            const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1455
1456            walk->addr = peer->addr;
1457            walk->port = peer->port;
1458            walk->flags = 0;
1459            if( peerPrefersCrypto( peer ) )
1460                walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1461            if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1462                walk->flags |= ADDED_F_SEED_FLAG;
1463        }
1464
1465        assert( ( walk - pex ) == peerCount );
1466        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1467        *setme_pex = pex;
1468    }
1469
1470    managerUnlock( manager );
1471    return peerCount;
1472}
1473
1474static int reconnectPulse( void * vtorrent );
1475
1476static int rechokePulse( void * vtorrent );
1477
1478void
1479tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1480                        const uint8_t * torrentHash )
1481{
1482    Torrent * t;
1483
1484    managerLock( manager );
1485
1486    t = getExistingTorrent( manager, torrentHash );
1487
1488    assert( t );
1489    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1490    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1491
1492    if( !t->isRunning )
1493    {
1494        t->isRunning = 1;
1495
1496        t->reconnectTimer = tr_timerNew( t->manager->session,
1497                                         reconnectPulse, t,
1498                                         RECONNECT_PERIOD_MSEC );
1499
1500        t->rechokeTimer = tr_timerNew( t->manager->session,
1501                                       rechokePulse, t,
1502                                       RECHOKE_PERIOD_MSEC );
1503
1504        reconnectPulse( t );
1505
1506        rechokePulse( t );
1507
1508        if( !tr_ptrArrayEmpty( t->webseeds ) )
1509            refillSoon( t );
1510    }
1511
1512    managerUnlock( manager );
1513}
1514
1515static void
1516stopTorrent( Torrent * t )
1517{
1518    assert( torrentIsLocked( t ) );
1519
1520    t->isRunning = 0;
1521    tr_timerFree( &t->rechokeTimer );
1522    tr_timerFree( &t->reconnectTimer );
1523
1524    /* disconnect the peers. */
1525    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1526    tr_ptrArrayClear( t->peers );
1527
1528    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1529     * which removes the handshake from t->outgoingHandshakes... */
1530    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1531        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1532}
1533
1534void
1535tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1536                       const uint8_t * torrentHash )
1537{
1538    managerLock( manager );
1539
1540    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1541
1542    managerUnlock( manager );
1543}
1544
1545void
1546tr_peerMgrAddTorrent( tr_peerMgr * manager,
1547                      tr_torrent * tor )
1548{
1549    Torrent * t;
1550
1551    managerLock( manager );
1552
1553    assert( tor );
1554    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1555
1556    t = torrentConstructor( manager, tor );
1557    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1558
1559    managerUnlock( manager );
1560}
1561
1562void
1563tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1564                         const uint8_t * torrentHash )
1565{
1566    Torrent * t;
1567
1568    managerLock( manager );
1569
1570    t = getExistingTorrent( manager, torrentHash );
1571    assert( t );
1572    stopTorrent( t );
1573    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1574    torrentDestructor( t );
1575
1576    managerUnlock( manager );
1577}
1578
1579void
1580tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1581                               const uint8_t *    torrentHash,
1582                               int8_t *           tab,
1583                               unsigned int       tabCount )
1584{
1585    tr_piece_index_t   i;
1586    const Torrent *    t;
1587    const tr_torrent * tor;
1588    float              interval;
1589    int                isSeed;
1590    int                peerCount;
1591    const tr_peer **   peers;
1592
1593    managerLock( manager );
1594
1595    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1596    tor = t->tor;
1597    interval = tor->info.pieceCount / (float)tabCount;
1598    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1599    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1600
1601    memset( tab, 0, tabCount );
1602
1603    for( i = 0; tor && i < tabCount; ++i )
1604    {
1605        const int piece = i * interval;
1606
1607        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1608            tab[i] = -1;
1609        else if( peerCount )
1610        {
1611            int j;
1612            for( j = 0; j < peerCount; ++j )
1613                if( tr_bitfieldHas( peers[j]->have, i ) )
1614                    ++tab[i];
1615        }
1616    }
1617
1618    managerUnlock( manager );
1619}
1620
1621/* Returns the pieces that are available from peers */
1622tr_bitfield*
1623tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1624                        const uint8_t *    torrentHash )
1625{
1626    int           i, size;
1627    Torrent *     t;
1628    tr_peer **    peers;
1629    tr_bitfield * pieces;
1630    managerLock( manager );
1631
1632    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1633    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1634    peers = getConnectedPeers( t, &size );
1635    for( i = 0; i < size; ++i )
1636        tr_bitfieldOr( pieces, peers[i]->have );
1637
1638    managerUnlock( manager );
1639    tr_free( peers );
1640    return pieces;
1641}
1642
1643int
1644tr_peerMgrHasConnections( const tr_peerMgr * manager,
1645                          const uint8_t *    torrentHash )
1646{
1647    int ret;
1648    const Torrent * t;
1649    managerLock( manager );
1650
1651    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1652    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1653
1654    managerUnlock( manager );
1655    return ret;
1656}
1657
1658void
1659tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1660                        const uint8_t    * torrentHash,
1661                        int              * setmePeersKnown,
1662                        int              * setmePeersConnected,
1663                        int              * setmeSeedsConnected,
1664                        int              * setmeWebseedsSendingToUs,
1665                        int              * setmePeersSendingToUs,
1666                        int              * setmePeersGettingFromUs,
1667                        int              * setmePeersFrom )
1668{
1669    int i, size;
1670    const Torrent * t;
1671    const tr_peer ** peers;
1672    const tr_webseed ** webseeds;
1673
1674    managerLock( manager );
1675
1676    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1677    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1678
1679    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1680    *setmePeersConnected       = 0;
1681    *setmeSeedsConnected       = 0;
1682    *setmePeersGettingFromUs   = 0;
1683    *setmePeersSendingToUs     = 0;
1684    *setmeWebseedsSendingToUs  = 0;
1685
1686    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1687        setmePeersFrom[i] = 0;
1688
1689    for( i=0; i<size; ++i )
1690    {
1691        const tr_peer * peer = peers[i];
1692        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1693
1694        if( peer->io == NULL ) /* not connected */
1695            continue;
1696
1697        ++*setmePeersConnected;
1698
1699        ++setmePeersFrom[atom->from];
1700
1701        if( clientIsDownloadingFrom( peer ) )
1702            ++*setmePeersSendingToUs;
1703
1704        if( clientIsUploadingTo( peer ) )
1705            ++*setmePeersGettingFromUs;
1706
1707        if( atom->flags & ADDED_F_SEED_FLAG )
1708            ++*setmeSeedsConnected;
1709    }
1710
1711    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1712    for( i=0; i<size; ++i )
1713    {
1714        if( tr_webseedIsActive( webseeds[i] ) )
1715            ++*setmeWebseedsSendingToUs;
1716    }
1717
1718    managerUnlock( manager );
1719}
1720
1721float*
1722tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1723                     const uint8_t *    torrentHash )
1724{
1725    const Torrent * t;
1726    const tr_webseed ** webseeds;
1727    int i;
1728    int webseedCount;
1729    float * ret;
1730
1731    assert( manager );
1732    managerLock( manager );
1733
1734    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1735    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1736                                                     &webseedCount );
1737    assert( webseedCount == t->tor->info.webseedCount );
1738    ret = tr_new0( float, webseedCount );
1739
1740    for( i=0; i<webseedCount; ++i )
1741        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1742            ret[i] = -1.0;
1743
1744    managerUnlock( manager );
1745    return ret;
1746}
1747
1748double
1749tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1750{
1751    assert( peer );
1752    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1753
1754    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1755}
1756
1757
1758struct tr_peer_stat *
1759tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1760                     const   uint8_t     * torrentHash,
1761                     int                 * setmeCount UNUSED )
1762{
1763    int             i, size;
1764    const Torrent * t;
1765    tr_peer **      peers;
1766    tr_peer_stat *  ret;
1767
1768    assert( manager );
1769    managerLock( manager );
1770
1771    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1772    peers = getConnectedPeers( (Torrent*)t, &size );
1773    ret = tr_new0( tr_peer_stat, size );
1774
1775    for( i = 0; i < size; ++i )
1776    {
1777        char *                   pch;
1778        const tr_peer *          peer = peers[i];
1779        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1780        tr_peer_stat *           stat = ret + i;
1781
1782        tr_ntop( &peer->addr, stat->addr, sizeof(stat->addr) );
1783        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1784        stat->port               = ntohs( peer->port );
1785        stat->from               = atom->from;
1786        stat->progress           = peer->progress;
1787        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1788        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1789        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1790        stat->peerIsChoked       = peer->peerIsChoked;
1791        stat->peerIsInterested   = peer->peerIsInterested;
1792        stat->clientIsChoked     = peer->clientIsChoked;
1793        stat->clientIsInterested = peer->clientIsInterested;
1794        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1795        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1796        stat->isUploadingTo      = clientIsUploadingTo( peer );
1797        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1798
1799        pch = stat->flagStr;
1800        if( t->optimistic == peer ) *pch++ = 'O';
1801        if( stat->isDownloadingFrom ) *pch++ = 'D';
1802        else if( stat->clientIsInterested ) *pch++ = 'd';
1803        if( stat->isUploadingTo ) *pch++ = 'U';
1804        else if( stat->peerIsInterested ) *pch++ = 'u';
1805        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1806        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1807        if( stat->isEncrypted ) *pch++ = 'E';
1808        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1809        if( stat->isIncoming ) *pch++ = 'I';
1810        *pch = '\0';
1811    }
1812
1813    *setmeCount = size;
1814    tr_free( peers );
1815
1816    managerUnlock( manager );
1817    return ret;
1818}
1819
1820/**
1821***
1822**/
1823
1824struct ChokeData
1825{
1826    tr_bool         doUnchoke;
1827    tr_bool         isInterested;
1828    tr_bool         isChoked;
1829    int             rate;
1830    tr_peer *       peer;
1831};
1832
1833static int
1834compareChoke( const void * va,
1835              const void * vb )
1836{
1837    const struct ChokeData * a = va;
1838    const struct ChokeData * b = vb;
1839
1840    if( a->rate != b->rate ) /* prefer higher overall speeds */
1841        return a->rate > b->rate ? -1 : 1;
1842
1843    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1844        return a->isChoked ? 1 : -1;
1845
1846    return 0;
1847}
1848
1849static int
1850isNew( const tr_peer * peer )
1851{
1852    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1853}
1854
1855static int
1856isSame( const tr_peer * peer )
1857{
1858    return peer && peer->client && strstr( peer->client, "Transmission" );
1859}
1860
1861/**
1862***
1863**/
1864
1865static void
1866rechoke( Torrent * t )
1867{
1868    int                i, peerCount, size, unchokedInterested;
1869    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1870    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1871    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1872
1873    assert( torrentIsLocked( t ) );
1874
1875    /* sort the peers by preference and rate */
1876    for( i = 0, size = 0; i < peerCount; ++i )
1877    {
1878        tr_peer * peer = peers[i];
1879        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1880
1881        if( peer->progress >= 1.0 ) /* choke all seeds */
1882        {
1883            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1884        }
1885        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1886        {
1887            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1888        }
1889        else if( chokeAll ) /* choke everyone if we're not uploading */
1890        {
1891            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1892        }
1893        else
1894        {
1895            struct ChokeData * n = &choke[size++];
1896            n->peer         = peer;
1897            n->isInterested = peer->peerIsInterested;
1898            n->isChoked     = peer->peerIsChoked;
1899            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1900        }
1901    }
1902
1903    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1904
1905    /**
1906     * Reciprocation and number of uploads capping is managed by unchoking
1907     * the N peers which have the best upload rate and are interested.
1908     * This maximizes the client's download rate. These N peers are
1909     * referred to as downloaders, because they are interested in downloading
1910     * from the client.
1911     *
1912     * Peers which have a better upload rate (as compared to the downloaders)
1913     * but aren't interested get unchoked. If they become interested, the
1914     * downloader with the worst upload rate gets choked. If a client has
1915     * a complete file, it uses its upload rate rather than its download
1916     * rate to decide which peers to unchoke.
1917     */
1918    unchokedInterested = 0;
1919    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1920        choke[i].doUnchoke = 1;
1921        if( choke[i].isInterested )
1922            ++unchokedInterested;
1923    }
1924
1925    /* optimistic unchoke */
1926    if( i < size )
1927    {
1928        int n;
1929        struct ChokeData * c;
1930        tr_ptrArray * randPool = tr_ptrArrayNew( );
1931
1932        for( ; i<size; ++i )
1933        {
1934            if( choke[i].isInterested )
1935            {
1936                const tr_peer * peer = choke[i].peer;
1937                int x = 1, y;
1938                if( isNew( peer ) ) x *= 3;
1939                if( isSame( peer ) ) x *= 3;
1940                for( y=0; y<x; ++y )
1941                    tr_ptrArrayAppend( randPool, &choke[i] );
1942            }
1943        }
1944
1945        if(( n = tr_ptrArraySize( randPool )))
1946        {
1947            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
1948            c->doUnchoke = 1;
1949            t->optimistic = c->peer;
1950        }
1951
1952        tr_ptrArrayFree( randPool, NULL );
1953    }
1954
1955    for( i = 0; i < size; ++i )
1956        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1957
1958    /* cleanup */
1959    tr_free( choke );
1960    tr_free( peers );
1961}
1962
1963static int
1964rechokePulse( void * vtorrent )
1965{
1966    Torrent * t = vtorrent;
1967
1968    torrentLock( t );
1969    rechoke( t );
1970    torrentUnlock( t );
1971    return TRUE;
1972}
1973
1974/***
1975****
1976****  Life and Death
1977****
1978***/
1979
1980static int
1981shouldPeerBeClosed( const Torrent * t,
1982                    const tr_peer * peer,
1983                    int             peerCount )
1984{
1985    const tr_torrent *       tor = t->tor;
1986    const time_t             now = time( NULL );
1987    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1988
1989    /* if it's marked for purging, close it */
1990    if( peer->doPurge )
1991    {
1992        tordbg( t, "purging peer %s because its doPurge flag is set",
1993                tr_peerIoAddrStr( &atom->addr, atom->port ) );
1994        return TRUE;
1995    }
1996
1997    /* if we're seeding and the peer has everything we have,
1998     * and enough time has passed for a pex exchange, then disconnect */
1999    if( tr_torrentIsSeed( tor ) )
2000    {
2001        int peerHasEverything;
2002        if( atom->flags & ADDED_F_SEED_FLAG )
2003            peerHasEverything = TRUE;
2004        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2005            peerHasEverything = FALSE;
2006        else {
2007            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2008            tr_bitfieldDifference( tmp, peer->have );
2009            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2010            tr_bitfieldFree( tmp );
2011        }
2012        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2013            tordbg( t, "purging peer %s because we're both seeds",
2014                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2015            return TRUE;
2016        }
2017    }
2018
2019    /* disconnect if it's been too long since piece data has been transferred.
2020     * this is on a sliding scale based on number of available peers... */
2021    {
2022        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2023        /* if we have >= relaxIfFewerThan, strictness is 100%.
2024         * if we have zero connections, strictness is 0% */
2025        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2026            ? 1.0
2027            : peerCount / (float)relaxStrictnessIfFewerThanN;
2028        const int lo = MIN_UPLOAD_IDLE_SECS;
2029        const int hi = MAX_UPLOAD_IDLE_SECS;
2030        const int limit = lo + ( ( hi - lo ) * strictness );
2031        const time_t then = peer->pieceDataActivityDate;
2032        const int idleTime = then ? ( now - then ) : 0;
2033        if( idleTime > limit ) {
2034            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2035                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2036            return TRUE;
2037        }
2038    }
2039
2040    return FALSE;
2041}
2042
2043static tr_peer **
2044getPeersToClose( Torrent * t, int * setmeSize )
2045{
2046    int i, peerCount, outsize;
2047    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2048    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2049
2050    assert( torrentIsLocked( t ) );
2051
2052    for( i = outsize = 0; i < peerCount; ++i )
2053        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2054            ret[outsize++] = peers[i];
2055
2056    *setmeSize = outsize;
2057    return ret;
2058}
2059
2060static int
2061compareCandidates( const void * va,
2062                   const void * vb )
2063{
2064    const struct peer_atom * a = *(const struct peer_atom**) va;
2065    const struct peer_atom * b = *(const struct peer_atom**) vb;
2066
2067    /* <Charles> Here we would probably want to try reconnecting to
2068     * peers that had most recently given us data. Lots of users have
2069     * trouble with resets due to their routers and/or ISPs. This way we
2070     * can quickly recover from an unwanted reset. So we sort
2071     * piece_data_time in descending order.
2072     */
2073
2074    if( a->piece_data_time != b->piece_data_time )
2075        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2076
2077    if( a->numFails != b->numFails )
2078        return a->numFails < b->numFails ? -1 : 1;
2079
2080    if( a->time != b->time )
2081        return a->time < b->time ? -1 : 1;
2082
2083    /* all other things being equal, prefer peers whose
2084     * information comes from a more reliable source */
2085    if( a->from != b->from )
2086        return a->from < b->from ? -1 : 1;
2087
2088    return 0;
2089}
2090
2091static int
2092getReconnectIntervalSecs( const struct peer_atom * atom )
2093{
2094    int          sec;
2095    const time_t now = time( NULL );
2096
2097    /* if we were recently connected to this peer and transferring piece
2098     * data, try to reconnect to them sooner rather that later -- we don't
2099     * want network troubles to get in the way of a good peer. */
2100    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2101        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2102
2103    /* don't allow reconnects more often than our minimum */
2104    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2105        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2106
2107    /* otherwise, the interval depends on how many times we've tried
2108     * and failed to connect to the peer */
2109    else switch( atom->numFails ) {
2110        case 0: sec = 0; break;
2111        case 1: sec = 5; break;
2112        case 2: sec = 2 * 60; break;
2113        case 3: sec = 15 * 60; break;
2114        case 4: sec = 30 * 60; break;
2115        case 5: sec = 60 * 60; break;
2116        default: sec = 120 * 60; break;
2117    }
2118
2119    return sec;
2120}
2121
2122static struct peer_atom **
2123getPeerCandidates( Torrent * t, int * setmeSize )
2124{
2125    int                 i, atomCount, retCount;
2126    struct peer_atom ** atoms;
2127    struct peer_atom ** ret;
2128    const time_t        now = time( NULL );
2129    const int           seed = tr_torrentIsSeed( t->tor );
2130
2131    assert( torrentIsLocked( t ) );
2132
2133    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2134    ret = tr_new( struct peer_atom*, atomCount );
2135    for( i = retCount = 0; i < atomCount; ++i )
2136    {
2137        int                interval;
2138        struct peer_atom * atom = atoms[i];
2139
2140        /* peer fed us too much bad data ... we only keep it around
2141         * now to weed it out in case someone sends it to us via pex */
2142        if( atom->myflags & MYFLAG_BANNED )
2143            continue;
2144
2145        /* peer was unconnectable before, so we're not going to keep trying.
2146         * this is needs a separate flag from `banned', since if they try
2147         * to connect to us later, we'll let them in */
2148        if( atom->myflags & MYFLAG_UNREACHABLE )
2149            continue;
2150
2151        /* we don't need two connections to the same peer... */
2152        if( peerIsInUse( t, &atom->addr ) )
2153            continue;
2154
2155        /* no need to connect if we're both seeds... */
2156        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2157                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2158            continue;
2159
2160        /* don't reconnect too often */
2161        interval = getReconnectIntervalSecs( atom );
2162        if( ( now - atom->time ) < interval )
2163        {
2164            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2165                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2166            continue;
2167        }
2168
2169        /* Don't connect to peers in our blocklist */
2170        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2171            continue;
2172
2173        ret[retCount++] = atom;
2174    }
2175
2176    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2177    *setmeSize = retCount;
2178    return ret;
2179}
2180
2181static int
2182reconnectPulse( void * vtorrent )
2183{
2184    Torrent *     t = vtorrent;
2185    static time_t prevTime = 0;
2186    static int    newConnectionsThisSecond = 0;
2187    time_t        now;
2188
2189    torrentLock( t );
2190
2191    now = time( NULL );
2192    if( prevTime != now )
2193    {
2194        prevTime = now;
2195        newConnectionsThisSecond = 0;
2196    }
2197
2198    if( !t->isRunning )
2199    {
2200        removeAllPeers( t );
2201    }
2202    else
2203    {
2204        int i, nCandidates, nBad;
2205        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2206        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2207
2208        if( nBad || nCandidates )
2209            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2210                   "%d connection candidates, %d atoms, max per pulse is %d",
2211                   t->tor->info.name, nBad, nCandidates,
2212                   tr_ptrArraySize( t->pool ),
2213                   (int)MAX_RECONNECTIONS_PER_PULSE );
2214
2215        /* disconnect some peers.
2216           if we transferred piece data, then they might be good peers,
2217           so reset their `numFails' weight to zero.  otherwise we connected
2218           to them fruitlessly, so mark it as another fail */
2219        for( i = 0; i < nBad; ++i ) {
2220            tr_peer * peer = connections[i];
2221            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2222            if( peer->pieceDataActivityDate )
2223                atom->numFails = 0;
2224            else
2225                ++atom->numFails;
2226            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2227            removePeer( t, peer );
2228        }
2229
2230        /* add some new ones */
2231        for( i = 0;    ( i < nCandidates )
2232           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2233           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2234           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2235        {
2236            tr_peerMgr *       mgr = t->manager;
2237            struct peer_atom * atom = candidates[i];
2238            tr_peerIo *        io;
2239
2240            tordbg( t, "Starting an OUTGOING connection with %s",
2241                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2242
2243            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2244            if( io == NULL )
2245            {
2246                atom->myflags |= MYFLAG_UNREACHABLE;
2247            }
2248            else
2249            {
2250                tr_handshake * handshake = tr_handshakeNew( io,
2251                                                            mgr->session->encryptionMode,
2252                                                            myHandshakeDoneCB,
2253                                                            mgr );
2254
2255                assert( tr_peerIoGetTorrentHash( io ) );
2256
2257                ++newConnectionsThisSecond;
2258
2259                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2260                                         handshakeCompare );
2261            }
2262
2263            atom->time = time( NULL );
2264        }
2265
2266        /* cleanup */
2267        tr_free( connections );
2268        tr_free( candidates );
2269    }
2270
2271    torrentUnlock( t );
2272    return TRUE;
2273}
2274
2275/****
2276*****
2277*****  BANDWIDTH ALLOCATION
2278*****
2279****/
2280
2281static void
2282pumpAllPeers( tr_peerMgr * mgr )
2283{
2284    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2285    int       i, j;
2286
2287    for( i=0; i<torrentCount; ++i )
2288    {
2289        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2290        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2291        {
2292            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2293            tr_peerMsgsPulse( peer->msgs );
2294        }
2295    }
2296}
2297
2298static int
2299bandwidthPulse( void * vmgr )
2300{
2301    tr_peerMgr * mgr = vmgr;
2302    managerLock( mgr );
2303
2304    pumpAllPeers( mgr );
2305    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2306    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2307    pumpAllPeers( mgr );
2308
2309    managerUnlock( mgr );
2310    return TRUE;
2311}
Note: See TracBrowser for help on using the repository browser.