source: trunk/libtransmission/peer-mgr.c @ 7234

Last change on this file since 7234 was 7234, checked in by charles, 12 years ago

(libT) #1549: support fast exensions' "reject" and "have all/none" messages

  • Property svn:keywords set to Date Rev Author Id
File size: 64.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7234 2008-12-02 17:10:54Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92/* We keep one of these for every peer we know about, whether
93 * it's connected or not, so the struct must be small.
94 * When our current connections underperform, we dip back
95 * into this list for new ones. */
96struct peer_atom
97{
98    uint8_t     from;
99    uint8_t     flags; /* these match the added_f flags */
100    uint8_t     myflags; /* flags that aren't defined in added_f */
101    tr_port     port;
102    uint16_t    numFails;
103    tr_address  addr;
104    time_t      time; /* when the peer's connection status last changed */
105    time_t      piece_data_time;
106};
107
108typedef struct
109{
110    tr_bool         isRunning;
111
112    uint8_t         hash[SHA_DIGEST_LENGTH];
113    int         *   pendingRequestCount;
114    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
115    tr_ptrArray *   pool; /* struct peer_atom */
116    tr_ptrArray *   peers; /* tr_peer */
117    tr_ptrArray *   webseeds; /* tr_webseed */
118    tr_timer *      reconnectTimer;
119    tr_timer *      rechokeTimer;
120    tr_timer *      refillTimer;
121    tr_torrent *    tor;
122    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
123
124    struct tr_peerMgr * manager;
125}
126Torrent;
127
128struct tr_peerMgr
129{
130    tr_session      * session;
131    tr_ptrArray     * torrents; /* Torrent */
132    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
133    tr_timer        * bandwidthTimer;
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187handshakeCompareToAddr( const void * va,
188                        const void * vb )
189{
190    const tr_handshake * a = va;
191
192    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
193}
194
195static int
196handshakeCompare( const void * a,
197                  const void * b )
198{
199    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
200}
201
202static tr_handshake*
203getExistingHandshake( tr_ptrArray      * handshakes,
204                      const tr_address * addr )
205{
206    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
207}
208
209static int
210comparePeerAtomToAddress( const void * va, const void * vb )
211{
212    const struct peer_atom * a = va;
213
214    return tr_compareAddresses( &a->addr, vb );
215}
216
217static int
218comparePeerAtoms( const void * va, const void * vb )
219{
220    const struct peer_atom * b = vb;
221
222    return comparePeerAtomToAddress( va, &b->addr );
223}
224
225/**
226***
227**/
228
229static int
230torrentCompare( const void * va,
231                const void * vb )
232{
233    const Torrent * a = va;
234    const Torrent * b = vb;
235
236    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
237}
238
239static int
240torrentCompareToHash( const void * va,
241                      const void * vb )
242{
243    const Torrent * a = va;
244    const uint8_t * b_hash = vb;
245
246    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
247}
248
249static Torrent*
250getExistingTorrent( tr_peerMgr *    manager,
251                    const uint8_t * hash )
252{
253    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
254                                             hash,
255                                             torrentCompareToHash );
256}
257
258static int
259peerCompare( const void * va, const void * vb )
260{
261    const tr_peer * a = va;
262    const tr_peer * b = vb;
263
264    return tr_compareAddresses( &a->addr, &b->addr );
265}
266
267static int
268peerCompareToAddr( const void * va, const void * vb )
269{
270    const tr_peer * a = va;
271
272    return tr_compareAddresses( &a->addr, vb );
273}
274
275static tr_peer*
276getExistingPeer( Torrent          * torrent,
277                 const tr_address * addr )
278{
279    assert( torrentIsLocked( torrent ) );
280    assert( addr );
281
282    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
283}
284
285static struct peer_atom*
286getExistingAtom( const Torrent    * t,
287                 const tr_address * addr )
288{
289    assert( torrentIsLocked( t ) );
290    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
291}
292
293static int
294peerIsInUse( const Torrent    * ct,
295             const tr_address * addr )
296{
297    Torrent * t = (Torrent*) ct;
298
299    assert( torrentIsLocked ( t ) );
300
301    return getExistingPeer( t, addr )
302           || getExistingHandshake( t->outgoingHandshakes, addr )
303           || getExistingHandshake( t->manager->incomingHandshakes, addr );
304}
305
306static tr_peer*
307peerConstructor( tr_torrent * tor, const tr_address * addr )
308{
309    tr_peer * p;
310
311    p = tr_new0( tr_peer, 1 );
312    p->addr = *addr;
313    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
314    return p;
315}
316
317static tr_peer*
318getPeer( Torrent          * torrent,
319         const tr_address * addr )
320{
321    tr_peer * peer;
322
323    assert( torrentIsLocked( torrent ) );
324
325    peer = getExistingPeer( torrent, addr );
326
327    if( peer == NULL )
328    {
329        peer = peerConstructor( torrent->tor, addr );
330        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
331    }
332
333    return peer;
334}
335
336static void
337peerDestructor( tr_peer * peer )
338{
339    assert( peer );
340
341    if( peer->msgs != NULL )
342    {
343        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
344        tr_peerMsgsFree( peer->msgs );
345    }
346
347    tr_peerIoFree( peer->io );
348
349    tr_bitfieldFree( peer->have );
350    tr_bitfieldFree( peer->blame );
351    tr_free( peer->client );
352
353    tr_bandwidthFree( peer->bandwidth );
354
355    tr_free( peer );
356}
357
358static void
359removePeer( Torrent * t,
360            tr_peer * peer )
361{
362    tr_peer *          removed;
363    struct peer_atom * atom;
364
365    assert( torrentIsLocked( t ) );
366
367    atom = getExistingAtom( t, &peer->addr );
368    assert( atom );
369    atom->time = time( NULL );
370
371    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
372    assert( removed == peer );
373    peerDestructor( removed );
374}
375
376static void
377removeAllPeers( Torrent * t )
378{
379    while( !tr_ptrArrayEmpty( t->peers ) )
380        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
381}
382
383static void
384torrentDestructor( void * vt )
385{
386    Torrent * t = vt;
387    uint8_t   hash[SHA_DIGEST_LENGTH];
388
389    assert( t );
390    assert( !t->isRunning );
391    assert( t->peers );
392    assert( torrentIsLocked( t ) );
393    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
394    assert( tr_ptrArrayEmpty( t->peers ) );
395
396    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
397
398    tr_timerFree( &t->reconnectTimer );
399    tr_timerFree( &t->rechokeTimer );
400    tr_timerFree( &t->refillTimer );
401
402    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
403    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
404    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
405    tr_ptrArrayFree( t->peers, NULL );
406
407    tr_free( t->pendingRequestCount );
408    tr_free( t );
409}
410
411static void peerCallbackFunc( void * vpeer,
412                              void * vevent,
413                              void * vt );
414
415static Torrent*
416torrentConstructor( tr_peerMgr * manager,
417                    tr_torrent * tor )
418{
419    int       i;
420    Torrent * t;
421
422    t = tr_new0( Torrent, 1 );
423    t->manager = manager;
424    t->tor = tor;
425    t->pool = tr_ptrArrayNew( );
426    t->peers = tr_ptrArrayNew( );
427    t->webseeds = tr_ptrArrayNew( );
428    t->outgoingHandshakes = tr_ptrArrayNew( );
429    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
430
431    for( i = 0; i < tor->info.webseedCount; ++i )
432    {
433        tr_webseed * w =
434            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
435                           t );
436        tr_ptrArrayAppend( t->webseeds, w );
437    }
438
439    return t;
440}
441
442
443static int bandwidthPulse( void * vmgr );
444
445
446tr_peerMgr*
447tr_peerMgrNew( tr_session * session )
448{
449    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
450
451    m->session = session;
452    m->torrents = tr_ptrArrayNew( );
453    m->incomingHandshakes = tr_ptrArrayNew( );
454    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
455    return m;
456}
457
458void
459tr_peerMgrFree( tr_peerMgr * manager )
460{
461    managerLock( manager );
462
463    tr_timerFree( &manager->bandwidthTimer );
464
465    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
466     * the item from manager->handshakes, so this is a little roundabout... */
467    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
468        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
469
470    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
471
472    /* free the torrents. */
473    tr_ptrArrayFree( manager->torrents, torrentDestructor );
474
475    managerUnlock( manager );
476    tr_free( manager );
477}
478
479static tr_peer**
480getConnectedPeers( Torrent * t,
481                   int *     setmeCount )
482{
483    int       i, peerCount, connectionCount;
484    tr_peer **peers;
485    tr_peer **ret;
486
487    assert( torrentIsLocked( t ) );
488
489    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
490    ret = tr_new( tr_peer *, peerCount );
491
492    for( i = connectionCount = 0; i < peerCount; ++i )
493        if( peers[i]->msgs )
494            ret[connectionCount++] = peers[i];
495
496    *setmeCount = connectionCount;
497    return ret;
498}
499
500static int
501clientIsDownloadingFrom( const tr_peer * peer )
502{
503    return peer->clientIsInterested && !peer->clientIsChoked;
504}
505
506static int
507clientIsUploadingTo( const tr_peer * peer )
508{
509    return peer->peerIsInterested && !peer->peerIsChoked;
510}
511
512/***
513****
514***/
515
516int
517tr_peerMgrPeerIsSeed( const tr_peerMgr * mgr,
518                      const uint8_t    * torrentHash,
519                      const tr_address * addr )
520{
521    int                      isSeed = FALSE;
522    const Torrent *          t = NULL;
523    const struct peer_atom * atom = NULL;
524
525    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
526    if( t )
527        atom = getExistingAtom( t, addr );
528    if( atom )
529        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
530
531    return isSeed;
532}
533
534/****
535*****
536*****  REFILL
537*****
538****/
539
540static void
541assertValidPiece( Torrent * t, tr_piece_index_t piece )
542{
543    assert( t );
544    assert( t->tor );
545    assert( piece < t->tor->info.pieceCount );
546}
547
548static int
549getPieceRequests( Torrent * t, tr_piece_index_t piece )
550{
551    assertValidPiece( t, piece );
552
553    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
554}
555
556static void
557incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
558{
559    assertValidPiece( t, piece );
560
561    if( t->pendingRequestCount == NULL )
562        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
563    t->pendingRequestCount[piece]++;
564}
565
566static void
567decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
568{
569    assertValidPiece( t, piece );
570
571    if( t->pendingRequestCount )
572        t->pendingRequestCount[piece]--;
573}
574
575struct tr_refill_piece
576{
577    tr_priority_t    priority;
578    uint32_t         piece;
579    uint32_t         peerCount;
580    int              random;
581    int              pendingRequestCount;
582    int              missingBlockCount;
583};
584
585static int
586compareRefillPiece( const void * aIn,
587                    const void * bIn )
588{
589    const struct tr_refill_piece * a = aIn;
590    const struct tr_refill_piece * b = bIn;
591
592    /* if one piece has a higher priority, it goes first */
593    if( a->priority != b->priority )
594        return a->priority > b->priority ? -1 : 1;
595
596    /* have a per-priority endgame */
597    if( a->pendingRequestCount != b->pendingRequestCount )
598        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
599
600    /* fewer missing pieces goes first */
601    if( a->missingBlockCount != b->missingBlockCount )
602        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
603
604    /* otherwise if one has fewer peers, it goes first */
605    if( a->peerCount != b->peerCount )
606        return a->peerCount < b->peerCount ? -1 : 1;
607
608    /* otherwise go with our random seed */
609    if( a->random != b->random )
610        return a->random < b->random ? -1 : 1;
611
612    return 0;
613}
614
615static tr_piece_index_t *
616getPreferredPieces( Torrent           * t,
617                    tr_piece_index_t  * pieceCount )
618{
619    const tr_torrent  * tor = t->tor;
620    const tr_info     * inf = &tor->info;
621    tr_piece_index_t    i;
622    tr_piece_index_t    poolSize = 0;
623    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
624    int                 peerCount;
625    tr_peer**           peers;
626
627    assert( torrentIsLocked( t ) );
628
629    peers = getConnectedPeers( t, &peerCount );
630
631    /* make a list of the pieces that we want but don't have */
632    for( i = 0; i < inf->pieceCount; ++i )
633        if( !tor->info.pieces[i].dnd
634                && !tr_cpPieceIsComplete( tor->completion, i ) )
635            pool[poolSize++] = i;
636
637    /* sort the pool by which to request next */
638    if( poolSize > 1 )
639    {
640        tr_piece_index_t j;
641        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
642
643        for( j = 0; j < poolSize; ++j )
644        {
645            int k;
646            const tr_piece_index_t piece = pool[j];
647            struct tr_refill_piece * setme = p + j;
648
649            setme->piece = piece;
650            setme->priority = inf->pieces[piece].priority;
651            setme->peerCount = 0;
652            setme->random = tr_cryptoWeakRandInt( INT_MAX );
653            setme->pendingRequestCount = getPieceRequests( t, piece );
654            setme->missingBlockCount
655                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
656
657            for( k = 0; k < peerCount; ++k )
658            {
659                const tr_peer * peer = peers[k];
660                if( peer->peerIsInterested
661                        && !peer->clientIsChoked
662                        && tr_bitfieldHas( peer->have, piece ) )
663                    ++setme->peerCount;
664            }
665        }
666
667        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
668               compareRefillPiece );
669
670        for( j = 0; j < poolSize; ++j )
671            pool[j] = p[j].piece;
672
673        tr_free( p );
674    }
675
676    tr_free( peers );
677
678    *pieceCount = poolSize;
679    return pool;
680}
681
682struct tr_blockIterator
683{
684    Torrent * t;
685    tr_block_index_t blockIndex, blockCount, *blocks;
686    tr_piece_index_t pieceIndex, pieceCount, *pieces;
687};
688
689static struct tr_blockIterator*
690blockIteratorNew( Torrent * t )
691{
692    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
693    i->t = t;
694    i->pieces = getPreferredPieces( t, &i->pieceCount );
695    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
696    return i;
697}
698
699static int
700blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
701{
702    int found;
703    Torrent * t = i->t;
704    tr_torrent * tor = t->tor;
705
706    while( ( i->blockIndex == i->blockCount )
707        && ( i->pieceIndex < i->pieceCount ) )
708    {
709        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
710        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
711        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
712        tr_block_index_t block;
713
714        assert( index < tor->info.pieceCount );
715
716        i->blockCount = 0;
717        i->blockIndex = 0;
718        for( block=b; block!=e; ++block )
719            if( !tr_cpBlockIsComplete( tor->completion, block ) )
720                i->blocks[i->blockCount++] = block;
721    }
722
723    if(( found = ( i->blockIndex < i->blockCount )))
724        *setme = i->blocks[i->blockIndex++];
725
726    return found;
727}
728
729static void
730blockIteratorFree( struct tr_blockIterator * i )
731{
732    tr_free( i->blocks );
733    tr_free( i->pieces );
734    tr_free( i );
735}
736
737static tr_peer**
738getPeersUploadingToClient( Torrent * t,
739                           int *     setmeCount )
740{
741    int j;
742    int peerCount = 0;
743    int retCount = 0;
744    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
745    tr_peer ** ret = tr_new( tr_peer *, peerCount );
746
747    j = 0; /* this is a temporary test to make sure we walk through all the peers */
748    if( peerCount )
749    {
750        /* Get a list of peers we're downloading from.
751           Pick a different starting point each time so all peers
752           get a chance at being the first in line */
753        const int fencepost = tr_cryptoWeakRandInt( peerCount );
754        int i = fencepost;
755        do {
756            if( clientIsDownloadingFrom( peers[i] ) )
757                ret[retCount++] = peers[i];
758            i = ( i + 1 ) % peerCount;
759            ++j;
760        } while( i != fencepost );
761    }
762    assert( j == peerCount );
763    *setmeCount = retCount;
764    return ret;
765}
766
767static uint32_t
768getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
769{
770    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
771    const uint64_t blockPos = tor->blockSize * b;
772    assert( blockPos >= piecePos );
773    return (uint32_t)( blockPos - piecePos );
774}
775
776static int
777refillPulse( void * vtorrent )
778{
779    tr_block_index_t block;
780    int peerCount;
781    int webseedCount;
782    tr_peer ** peers;
783    tr_webseed ** webseeds;
784    struct tr_blockIterator * blockIterator;
785    Torrent * t = vtorrent;
786    tr_torrent * tor = t->tor;
787
788    if( !t->isRunning )
789        return TRUE;
790    if( tr_torrentIsSeed( t->tor ) )
791        return TRUE;
792
793    torrentLock( t );
794    tordbg( t, "Refilling Request Buffers..." );
795
796    blockIterator = blockIteratorNew( t );
797    peers = getPeersUploadingToClient( t, &peerCount );
798    webseedCount = tr_ptrArraySize( t->webseeds );
799    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
800                          webseedCount * sizeof( tr_webseed* ) );
801
802    while( ( webseedCount || peerCount )
803        && blockIteratorNext( blockIterator, &block ) )
804    {
805        int j;
806        int handled = FALSE;
807
808        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
809        const uint32_t offset = getBlockOffsetInPiece( tor, block );
810        const uint32_t length = tr_torBlockCountBytes( tor, block );
811
812        /* find a peer who can ask for this block */
813        for( j=0; !handled && j<peerCount; )
814        {
815            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
816            switch( val )
817            {
818                case TR_ADDREQ_FULL:
819                case TR_ADDREQ_CLIENT_CHOKED:
820                    peers[j] = peers[--peerCount];
821                    break;
822
823                case TR_ADDREQ_MISSING:
824                case TR_ADDREQ_DUPLICATE:
825                    ++j;
826                    break;
827
828                case TR_ADDREQ_OK:
829                    incrementPieceRequests( t, index );
830                    handled = TRUE;
831                    break;
832
833                default:
834                    assert( 0 && "unhandled value" );
835                    break;
836            }
837        }
838
839        /* maybe one of the webseeds can do it */
840        for( j=0; !handled && j<webseedCount; )
841        {
842            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
843                                                          index, offset, length );
844            switch( val )
845            {
846                case TR_ADDREQ_FULL:
847                    webseeds[j] = webseeds[--webseedCount];
848                    break;
849
850                case TR_ADDREQ_OK:
851                    incrementPieceRequests( t, index );
852                    handled = TRUE;
853                    break;
854
855                default:
856                    assert( 0 && "unhandled value" );
857                    break;
858            }
859        }
860    }
861
862    /* cleanup */
863    blockIteratorFree( blockIterator );
864    tr_free( webseeds );
865    tr_free( peers );
866
867    t->refillTimer = NULL;
868    torrentUnlock( t );
869    return FALSE;
870}
871
872static void
873broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
874{
875    int i, size;
876    tr_peer ** peers;
877
878    assert( torrentIsLocked( t ) );
879
880    peers = getConnectedPeers( t, &size );
881    for( i=0; i<size; ++i )
882        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
883    tr_free( peers );
884}
885
886static void
887addStrike( Torrent * t,
888           tr_peer * peer )
889{
890    tordbg( t, "increasing peer %s strike count to %d",
891            tr_peerIoAddrStr( &peer->addr,
892                              peer->port ), peer->strikes + 1 );
893
894    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
895    {
896        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
897        atom->myflags |= MYFLAG_BANNED;
898        peer->doPurge = 1;
899        tordbg( t, "banning peer %s",
900               tr_peerIoAddrStr( &atom->addr, atom->port ) );
901    }
902}
903
904static void
905gotBadPiece( Torrent *        t,
906             tr_piece_index_t pieceIndex )
907{
908    tr_torrent *   tor = t->tor;
909    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
910
911    tor->corruptCur += byteCount;
912    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
913}
914
915static void
916refillSoon( Torrent * t )
917{
918    if( t->refillTimer == NULL )
919        t->refillTimer = tr_timerNew( t->manager->session,
920                                      refillPulse, t,
921                                      REFILL_PERIOD_MSEC );
922}
923
924static void
925peerSuggestedPiece( Torrent            * t,
926                    tr_peer            * peer,
927                    tr_piece_index_t     pieceIndex,
928                    int                  isFastAllowed )
929{
930    assert( t );
931    assert( peer );
932    assert( peer->msgs );
933
934    /* is this a valid piece? */
935    if(  pieceIndex >= t->tor->info.pieceCount )
936        return;
937
938    /* don't ask for it if we've already got it */
939    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
940        return;
941
942    /* don't ask for it if they don't have it */
943    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
944        return;
945
946    /* don't ask for it if we're choked and it's not fast */
947    if( !isFastAllowed && peer->clientIsChoked )
948        return;
949
950    /* request the blocks that we don't have in this piece */
951    {
952        tr_block_index_t block;
953        const tr_torrent * tor = t->tor;
954        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
955        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
956
957        for( block=start; block<end; ++block )
958        {
959            if( !tr_cpBlockIsComplete( tor->completion, block ) )
960            {
961                const uint32_t offset = getBlockOffsetInPiece( tor, block );
962                const uint32_t length = tr_torBlockCountBytes( tor, block );
963                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
964                incrementPieceRequests( t, pieceIndex );
965            }
966        }
967    }
968}
969
970static void
971peerCallbackFunc( void * vpeer,
972                  void * vevent,
973                  void * vt )
974{
975    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
976    Torrent *             t = (Torrent *) vt;
977    const tr_peer_event * e = vevent;
978
979    torrentLock( t );
980
981    switch( e->eventType )
982    {
983        case TR_PEER_NEED_REQ:
984            refillSoon( t );
985            break;
986
987        case TR_PEER_CANCEL:
988            decrementPieceRequests( t, e->pieceIndex );
989            break;
990
991        case TR_PEER_PEER_GOT_DATA:
992        {
993            const time_t now = time( NULL );
994            tr_torrent * tor = t->tor;
995
996            tor->activityDate = now;
997
998            if( e->wasPieceData )
999                tor->uploadedCur += e->length;
1000
1001            /* update the stats */
1002            if( e->wasPieceData )
1003                tr_statsAddUploaded( tor->session, e->length );
1004
1005            /* update our atom */
1006            if( peer ) {
1007                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1008                a->piece_data_time = now;
1009            }
1010
1011            break;
1012        }
1013
1014        case TR_PEER_CLIENT_GOT_SUGGEST:
1015            if( peer )
1016                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1017            break;
1018
1019        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1020            if( peer )
1021                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1022            break;
1023
1024        case TR_PEER_CLIENT_GOT_DATA:
1025        {
1026            const time_t now = time( NULL );
1027            tr_torrent * tor = t->tor;
1028
1029            tor->activityDate = now;
1030
1031            /* only add this to downloadedCur if we got it from a peer --
1032             * webseeds shouldn't count against our ratio.  As one tracker
1033             * admin put it, "Those pieces are downloaded directly from the
1034             * content distributor, not the peers, it is the tracker's job
1035             * to manage the swarms, not the web server and does not fit
1036             * into the jurisdiction of the tracker." */
1037            if( peer && e->wasPieceData )
1038                tor->downloadedCur += e->length;
1039
1040            /* update the stats */ 
1041            if( e->wasPieceData )
1042                tr_statsAddDownloaded( tor->session, e->length );
1043
1044            /* update our atom */
1045            if( peer ) {
1046                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1047                a->piece_data_time = now;
1048            }
1049
1050            break;
1051        }
1052
1053        case TR_PEER_PEER_PROGRESS:
1054        {
1055            if( peer )
1056            {
1057                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1058                const int          peerIsSeed = e->progress >= 1.0;
1059                if( peerIsSeed )
1060                {
1061                    tordbg( t, "marking peer %s as a seed",
1062                           tr_peerIoAddrStr( &atom->addr,
1063                                             atom->port ) );
1064                    atom->flags |= ADDED_F_SEED_FLAG;
1065                }
1066                else
1067                {
1068                    tordbg( t, "marking peer %s as a non-seed",
1069                           tr_peerIoAddrStr( &atom->addr,
1070                                             atom->port ) );
1071                    atom->flags &= ~ADDED_F_SEED_FLAG;
1072                }
1073            }
1074            break;
1075        }
1076
1077        case TR_PEER_CLIENT_GOT_BLOCK:
1078        {
1079            tr_torrent *     tor = t->tor;
1080
1081            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1082                                                e->offset );
1083
1084            tr_cpBlockAdd( tor->completion, block );
1085            decrementPieceRequests( t, e->pieceIndex );
1086
1087            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1088
1089            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1090            {
1091                const tr_piece_index_t p = e->pieceIndex;
1092                const int              ok = tr_ioTestPiece( tor, p );
1093
1094                if( !ok )
1095                {
1096                    tr_torerr( tor,
1097                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1098                              (unsigned long)p );
1099                }
1100
1101                tr_torrentSetHasPiece( tor, p, ok );
1102                tr_torrentSetPieceChecked( tor, p, TRUE );
1103                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1104
1105                if( !ok )
1106                    gotBadPiece( t, p );
1107                else
1108                {
1109                    int        i, peerCount;
1110                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1111                    for( i = 0; i < peerCount; ++i )
1112                        tr_peerMsgsHave( peers[i]->msgs, p );
1113                    tr_free( peers );
1114                }
1115
1116                tr_torrentRecheckCompleteness( tor );
1117            }
1118            break;
1119        }
1120
1121        case TR_PEER_ERROR:
1122            if( e->err == EINVAL )
1123            {
1124                addStrike( t, peer );
1125                peer->doPurge = 1;
1126            }
1127            else if( ( e->err == ERANGE )
1128                  || ( e->err == EMSGSIZE )
1129                  || ( e->err == ENOTCONN ) )
1130            {
1131                /* some protocol error from the peer */
1132                peer->doPurge = 1;
1133            }
1134            else /* a local error, such as an IO error */
1135            {
1136                t->tor->error = e->err;
1137                tr_strlcpy( t->tor->errorString,
1138                            tr_strerror( t->tor->error ),
1139                            sizeof( t->tor->errorString ) );
1140                tr_torrentStop( t->tor );
1141            }
1142            break;
1143
1144        default:
1145            assert( 0 );
1146    }
1147
1148    torrentUnlock( t );
1149}
1150
1151static void
1152ensureAtomExists( Torrent          * t,
1153                  const tr_address * addr,
1154                  tr_port            port,
1155                  uint8_t            flags,
1156                  uint8_t            from )
1157{
1158    if( getExistingAtom( t, addr ) == NULL )
1159    {
1160        struct peer_atom * a;
1161        a = tr_new0( struct peer_atom, 1 );
1162        a->addr = *addr;
1163        a->port = port;
1164        a->flags = flags;
1165        a->from = from;
1166        tordbg( t, "got a new atom: %s",
1167               tr_peerIoAddrStr( &a->addr, a->port ) );
1168        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1169    }
1170}
1171
1172static int
1173getMaxPeerCount( const tr_torrent * tor )
1174{
1175    return tor->maxConnectedPeers;
1176}
1177
1178static int
1179getPeerCount( const Torrent * t )
1180{
1181    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1182               t->outgoingHandshakes );
1183}
1184
1185/* FIXME: this is kind of a mess. */
1186static int
1187myHandshakeDoneCB( tr_handshake *  handshake,
1188                   tr_peerIo *     io,
1189                   int             isConnected,
1190                   const uint8_t * peer_id,
1191                   void *          vmanager )
1192{
1193    int                ok = isConnected;
1194    int                success = FALSE;
1195    tr_port            port;
1196    const tr_address * addr;
1197    tr_peerMgr       * manager = vmanager;
1198    Torrent          * t;
1199    tr_handshake     * ours;
1200
1201    assert( io );
1202    assert( isConnected == 0 || isConnected == 1 );
1203
1204    t = tr_peerIoHasTorrentHash( io )
1205        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1206        : NULL;
1207
1208    if( tr_peerIoIsIncoming ( io ) )
1209        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1210                                        handshake, handshakeCompare );
1211    else if( t )
1212        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1213                                        handshake, handshakeCompare );
1214    else
1215        ours = handshake;
1216
1217    assert( ours );
1218    assert( ours == handshake );
1219
1220    if( t )
1221        torrentLock( t );
1222
1223    addr = tr_peerIoGetAddress( io, &port );
1224
1225    if( !ok || !t || !t->isRunning )
1226    {
1227        if( t )
1228        {
1229            struct peer_atom * atom = getExistingAtom( t, addr );
1230            if( atom )
1231                ++atom->numFails;
1232        }
1233
1234        tr_peerIoFree( io );
1235    }
1236    else /* looking good */
1237    {
1238        struct peer_atom * atom;
1239        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1240        atom = getExistingAtom( t, addr );
1241        atom->time = time( NULL );
1242
1243        if( atom->myflags & MYFLAG_BANNED )
1244        {
1245            tordbg( t, "banned peer %s tried to reconnect",
1246                   tr_peerIoAddrStr( &atom->addr,
1247                                     atom->port ) );
1248            tr_peerIoFree( io );
1249        }
1250        else if( tr_peerIoIsIncoming( io )
1251               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1252
1253        {
1254            tr_peerIoFree( io );
1255        }
1256        else
1257        {
1258            tr_peer * peer = getExistingPeer( t, addr );
1259
1260            if( peer ) /* we already have this peer */
1261            {
1262                tr_peerIoFree( io );
1263            }
1264            else
1265            {
1266                peer = getPeer( t, addr );
1267                tr_free( peer->client );
1268
1269                if( !peer_id )
1270                    peer->client = NULL;
1271                else {
1272                    char client[128];
1273                    tr_clientForId( client, sizeof( client ), peer_id );
1274                    peer->client = tr_strdup( client );
1275                }
1276
1277                peer->port = port;
1278                peer->io = io;
1279                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1280                tr_peerIoSetBandwidth( io, peer->bandwidth );
1281
1282                success = TRUE;
1283            }
1284        }
1285    }
1286
1287    if( t )
1288        torrentUnlock( t );
1289
1290    return success;
1291}
1292
1293void
1294tr_peerMgrAddIncoming( tr_peerMgr * manager,
1295                       tr_address * addr,
1296                       tr_port      port,
1297                       int          socket )
1298{
1299    managerLock( manager );
1300
1301    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1302    {
1303        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1304               tr_ntop_non_ts( addr ) );
1305        tr_netClose( socket );
1306    }
1307    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1308    {
1309        tr_netClose( socket );
1310    }
1311    else /* we don't have a connetion to them yet... */
1312    {
1313        tr_peerIo *    io;
1314        tr_handshake * handshake;
1315
1316        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1317
1318        handshake = tr_handshakeNew( io,
1319                                     manager->session->encryptionMode,
1320                                     myHandshakeDoneCB,
1321                                     manager );
1322
1323        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1324                                 handshakeCompare );
1325    }
1326
1327    managerUnlock( manager );
1328}
1329
1330void
1331tr_peerMgrAddPex( tr_peerMgr *    manager,
1332                  const uint8_t * torrentHash,
1333                  uint8_t         from,
1334                  const tr_pex *  pex )
1335{
1336    Torrent * t;
1337
1338    managerLock( manager );
1339
1340    t = getExistingTorrent( manager, torrentHash );
1341    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1342        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1343
1344    managerUnlock( manager );
1345}
1346
1347tr_pex *
1348tr_peerMgrCompactToPex( const void *    compact,
1349                        size_t          compactLen,
1350                        const uint8_t * added_f,
1351                        size_t          added_f_len,
1352                        size_t *        pexCount )
1353{
1354    size_t          i;
1355    size_t          n = compactLen / 6;
1356    const uint8_t * walk = compact;
1357    tr_pex *        pex = tr_new0( tr_pex, n );
1358
1359    for( i = 0; i < n; ++i )
1360    {
1361        pex[i].addr.type = TR_AF_INET;
1362        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1363        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1364        if( added_f && ( n == added_f_len ) )
1365            pex[i].flags = added_f[i];
1366    }
1367
1368    *pexCount = n;
1369    return pex;
1370}
1371
1372/**
1373***
1374**/
1375
1376void
1377tr_peerMgrSetBlame( tr_peerMgr *     manager,
1378                    const uint8_t *  torrentHash,
1379                    tr_piece_index_t pieceIndex,
1380                    int              success )
1381{
1382    if( !success )
1383    {
1384        int        peerCount, i;
1385        Torrent *  t = getExistingTorrent( manager, torrentHash );
1386        tr_peer ** peers;
1387
1388        assert( torrentIsLocked( t ) );
1389
1390        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1391        for( i = 0; i < peerCount; ++i )
1392        {
1393            tr_peer * peer = peers[i];
1394            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1395            {
1396                tordbg(
1397                    t,
1398                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1399                    tr_peerIoAddrStr( &peer->addr, peer->port ),
1400                    pieceIndex, (int)peer->strikes + 1 );
1401                addStrike( t, peer );
1402            }
1403        }
1404    }
1405}
1406
1407int
1408tr_pexCompare( const void * va,
1409               const void * vb )
1410{
1411    const tr_pex * a = va;
1412    const tr_pex * b = vb;
1413    int            i = tr_compareAddresses( &a->addr, &b->addr );
1414
1415    if( i ) return i;
1416    if( a->port < b->port ) return -1;
1417    if( a->port > b->port ) return 1;
1418    return 0;
1419}
1420
1421int tr_pexCompare( const void * a,
1422                   const void * b );
1423
1424static int
1425peerPrefersCrypto( const tr_peer * peer )
1426{
1427    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1428        return TRUE;
1429
1430    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1431        return FALSE;
1432
1433    return tr_peerIoIsEncrypted( peer->io );
1434}
1435
1436int
1437tr_peerMgrGetPeers( tr_peerMgr *    manager,
1438                    const uint8_t * torrentHash,
1439                    tr_pex **       setme_pex )
1440{
1441    int peerCount = 0;
1442    const Torrent *  t;
1443
1444    managerLock( manager );
1445
1446    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1447    if( !t )
1448    {
1449        *setme_pex = NULL;
1450    }
1451    else
1452    {
1453        int i;
1454        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1455        tr_pex * pex = tr_new( tr_pex, peerCount );
1456        tr_pex * walk = pex;
1457
1458        for( i = 0; i < peerCount; ++i, ++walk )
1459        {
1460            const tr_peer * peer = peers[i];
1461            walk->addr = peer->addr;
1462            walk->port = peer->port;
1463            walk->flags = 0;
1464            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1465            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1466        }
1467
1468        assert( ( walk - pex ) == peerCount );
1469        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1470        *setme_pex = pex;
1471    }
1472
1473    managerUnlock( manager );
1474    return peerCount;
1475}
1476
1477static int reconnectPulse( void * vtorrent );
1478
1479static int rechokePulse( void * vtorrent );
1480
1481void
1482tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1483                        const uint8_t * torrentHash )
1484{
1485    Torrent * t;
1486
1487    managerLock( manager );
1488
1489    t = getExistingTorrent( manager, torrentHash );
1490
1491    assert( t );
1492    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1493    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1494
1495    if( !t->isRunning )
1496    {
1497        t->isRunning = 1;
1498
1499        t->reconnectTimer = tr_timerNew( t->manager->session,
1500                                         reconnectPulse, t,
1501                                         RECONNECT_PERIOD_MSEC );
1502
1503        t->rechokeTimer = tr_timerNew( t->manager->session,
1504                                       rechokePulse, t,
1505                                       RECHOKE_PERIOD_MSEC );
1506
1507        reconnectPulse( t );
1508
1509        rechokePulse( t );
1510
1511        if( !tr_ptrArrayEmpty( t->webseeds ) )
1512            refillSoon( t );
1513    }
1514
1515    managerUnlock( manager );
1516}
1517
1518static void
1519stopTorrent( Torrent * t )
1520{
1521    assert( torrentIsLocked( t ) );
1522
1523    t->isRunning = 0;
1524    tr_timerFree( &t->rechokeTimer );
1525    tr_timerFree( &t->reconnectTimer );
1526
1527    /* disconnect the peers. */
1528    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1529    tr_ptrArrayClear( t->peers );
1530
1531    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1532     * which removes the handshake from t->outgoingHandshakes... */
1533    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1534        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1535}
1536
1537void
1538tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1539                       const uint8_t * torrentHash )
1540{
1541    managerLock( manager );
1542
1543    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1544
1545    managerUnlock( manager );
1546}
1547
1548void
1549tr_peerMgrAddTorrent( tr_peerMgr * manager,
1550                      tr_torrent * tor )
1551{
1552    Torrent * t;
1553
1554    managerLock( manager );
1555
1556    assert( tor );
1557    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1558
1559    t = torrentConstructor( manager, tor );
1560    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1561
1562    managerUnlock( manager );
1563}
1564
1565void
1566tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1567                         const uint8_t * torrentHash )
1568{
1569    Torrent * t;
1570
1571    managerLock( manager );
1572
1573    t = getExistingTorrent( manager, torrentHash );
1574    assert( t );
1575    stopTorrent( t );
1576    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1577    torrentDestructor( t );
1578
1579    managerUnlock( manager );
1580}
1581
1582void
1583tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1584                               const uint8_t *    torrentHash,
1585                               int8_t *           tab,
1586                               unsigned int       tabCount )
1587{
1588    tr_piece_index_t   i;
1589    const Torrent *    t;
1590    const tr_torrent * tor;
1591    float              interval;
1592    int                isSeed;
1593    int                peerCount;
1594    const tr_peer **   peers;
1595
1596    managerLock( manager );
1597
1598    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1599    tor = t->tor;
1600    interval = tor->info.pieceCount / (float)tabCount;
1601    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1602    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1603
1604    memset( tab, 0, tabCount );
1605
1606    for( i = 0; tor && i < tabCount; ++i )
1607    {
1608        const int piece = i * interval;
1609
1610        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1611            tab[i] = -1;
1612        else if( peerCount )
1613        {
1614            int j;
1615            for( j = 0; j < peerCount; ++j )
1616                if( tr_bitfieldHas( peers[j]->have, i ) )
1617                    ++tab[i];
1618        }
1619    }
1620
1621    managerUnlock( manager );
1622}
1623
1624/* Returns the pieces that are available from peers */
1625tr_bitfield*
1626tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1627                        const uint8_t *    torrentHash )
1628{
1629    int           i, size;
1630    Torrent *     t;
1631    tr_peer **    peers;
1632    tr_bitfield * pieces;
1633
1634    managerLock( manager );
1635
1636    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1637    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1638    peers = getConnectedPeers( t, &size );
1639    for( i = 0; i < size; ++i )
1640        tr_bitfieldOr( pieces, peers[i]->have );
1641
1642    managerUnlock( manager );
1643    tr_free( peers );
1644    return pieces;
1645}
1646
1647int
1648tr_peerMgrHasConnections( const tr_peerMgr * manager,
1649                          const uint8_t *    torrentHash )
1650{
1651    int             ret;
1652    const Torrent * t;
1653
1654    managerLock( manager );
1655
1656    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1657    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1658               || !tr_ptrArrayEmpty( t->webseeds ) );
1659
1660    managerUnlock( manager );
1661    return ret;
1662}
1663
1664void
1665tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1666                        const uint8_t *    torrentHash,
1667                        int *              setmePeersKnown,
1668                        int *              setmePeersConnected,
1669                        int *              setmeSeedsConnected,
1670                        int *              setmeWebseedsSendingToUs,
1671                        int *              setmePeersSendingToUs,
1672                        int *              setmePeersGettingFromUs,
1673                        int *              setmePeersFrom )
1674{
1675    int                 i, size;
1676    const Torrent *     t;
1677    const tr_peer **    peers;
1678    const tr_webseed ** webseeds;
1679
1680    managerLock( manager );
1681
1682    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1683    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1684
1685    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1686    *setmePeersConnected       = 0;
1687    *setmeSeedsConnected       = 0;
1688    *setmePeersGettingFromUs   = 0;
1689    *setmePeersSendingToUs     = 0;
1690    *setmeWebseedsSendingToUs  = 0;
1691
1692    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1693        setmePeersFrom[i] = 0;
1694
1695    for( i = 0; i < size; ++i )
1696    {
1697        const tr_peer *          peer = peers[i];
1698        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1699
1700        if( peer->io == NULL ) /* not connected */
1701            continue;
1702
1703        ++ * setmePeersConnected;
1704
1705        ++setmePeersFrom[atom->from];
1706
1707        if( clientIsDownloadingFrom( peer ) )
1708            ++ * setmePeersSendingToUs;
1709
1710        if( clientIsUploadingTo( peer ) )
1711            ++ * setmePeersGettingFromUs;
1712
1713        if( atom->flags & ADDED_F_SEED_FLAG )
1714            ++ * setmeSeedsConnected;
1715    }
1716
1717    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1718    for( i = 0; i < size; ++i )
1719    {
1720        if( tr_webseedIsActive( webseeds[i] ) )
1721            ++ * setmeWebseedsSendingToUs;
1722    }
1723
1724    managerUnlock( manager );
1725}
1726
1727float*
1728tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1729                     const uint8_t *    torrentHash )
1730{
1731    const Torrent *     t;
1732    const tr_webseed ** webseeds;
1733    int                 i;
1734    int                 webseedCount;
1735    float *             ret;
1736
1737    assert( manager );
1738    managerLock( manager );
1739
1740    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1741    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1742                                                     &webseedCount );
1743    assert( webseedCount == t->tor->info.webseedCount );
1744    ret = tr_new0( float, webseedCount );
1745
1746    for( i = 0; i < webseedCount; ++i )
1747        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1748            ret[i] = -1.0;
1749
1750    managerUnlock( manager );
1751    return ret;
1752}
1753
1754double
1755tr_peerGetPieceSpeed( const tr_peer    * peer,
1756                      tr_direction       direction )
1757{
1758    assert( peer );
1759    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1760
1761    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1762}
1763
1764
1765struct tr_peer_stat *
1766tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1767                     const   uint8_t     * torrentHash,
1768                     int                 * setmeCount UNUSED )
1769{
1770    int             i, size;
1771    const Torrent * t;
1772    tr_peer **      peers;
1773    tr_peer_stat *  ret;
1774
1775    assert( manager );
1776    managerLock( manager );
1777
1778    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1779    peers = getConnectedPeers( (Torrent*)t, &size );
1780    ret = tr_new0( tr_peer_stat, size );
1781
1782    for( i = 0; i < size; ++i )
1783    {
1784        char *                   pch;
1785        const tr_peer *          peer = peers[i];
1786        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1787        tr_peer_stat *           stat = ret + i;
1788
1789        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1790        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1791                   sizeof( stat->client ) );
1792        stat->port               = ntohs( peer->port );
1793        stat->from               = atom->from;
1794        stat->progress           = peer->progress;
1795        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1796        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1797        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1798        stat->peerIsChoked       = peer->peerIsChoked;
1799        stat->peerIsInterested   = peer->peerIsInterested;
1800        stat->clientIsChoked     = peer->clientIsChoked;
1801        stat->clientIsInterested = peer->clientIsInterested;
1802        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1803        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1804        stat->isUploadingTo      = clientIsUploadingTo( peer );
1805
1806        pch = stat->flagStr;
1807        if( t->optimistic == peer ) *pch++ = 'O';
1808        if( stat->isDownloadingFrom ) *pch++ = 'D';
1809        else if( stat->clientIsInterested ) *pch++ = 'd';
1810        if( stat->isUploadingTo ) *pch++ = 'U';
1811        else if( stat->peerIsInterested ) *pch++ = 'u';
1812        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1813                'K';
1814        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1815        if( stat->isEncrypted ) *pch++ = 'E';
1816        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1817        if( stat->isIncoming ) *pch++ = 'I';
1818        *pch = '\0';
1819    }
1820
1821    *setmeCount = size;
1822    tr_free( peers );
1823
1824    managerUnlock( manager );
1825    return ret;
1826}
1827
1828/**
1829***
1830**/
1831
1832struct ChokeData
1833{
1834    tr_bool         doUnchoke;
1835    tr_bool         isInterested;
1836    tr_bool         isChoked;
1837    int             rate;
1838    tr_peer *       peer;
1839};
1840
1841static int
1842compareChoke( const void * va,
1843              const void * vb )
1844{
1845    const struct ChokeData * a = va;
1846    const struct ChokeData * b = vb;
1847
1848    if( a->rate != b->rate ) /* prefer higher overall speeds */
1849        return a->rate > b->rate ? -1 : 1;
1850
1851    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1852        return a->isChoked ? 1 : -1;
1853
1854    return 0;
1855}
1856
1857static int
1858isNew( const tr_peer * peer )
1859{
1860    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1861}
1862
1863static int
1864isSame( const tr_peer * peer )
1865{
1866    return peer && peer->client && strstr( peer->client, "Transmission" );
1867}
1868
1869/**
1870***
1871**/
1872
1873static void
1874rechoke( Torrent * t )
1875{
1876    int                i, peerCount, size, unchokedInterested;
1877    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1878    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1879    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1880
1881    assert( torrentIsLocked( t ) );
1882
1883    /* sort the peers by preference and rate */
1884    for( i = 0, size = 0; i < peerCount; ++i )
1885    {
1886        tr_peer * peer = peers[i];
1887        if( peer->progress >= 1.0 ) /* choke all seeds */
1888            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1889        else if( chokeAll )
1890            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1891        else {
1892            struct ChokeData * n = &choke[size++];
1893            n->peer         = peer;
1894            n->isInterested = peer->peerIsInterested;
1895            n->isChoked     = peer->peerIsChoked;
1896            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1897        }
1898    }
1899
1900    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1901
1902    /**
1903     * Reciprocation and number of uploads capping is managed by unchoking
1904     * the N peers which have the best upload rate and are interested.
1905     * This maximizes the client's download rate. These N peers are
1906     * referred to as downloaders, because they are interested in downloading
1907     * from the client.
1908     *
1909     * Peers which have a better upload rate (as compared to the downloaders)
1910     * but aren't interested get unchoked. If they become interested, the
1911     * downloader with the worst upload rate gets choked. If a client has
1912     * a complete file, it uses its upload rate rather than its download
1913     * rate to decide which peers to unchoke.
1914     */
1915    unchokedInterested = 0;
1916    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1917    {
1918        choke[i].doUnchoke = 1;
1919        if( choke[i].isInterested )
1920            ++unchokedInterested;
1921    }
1922
1923    /* optimistic unchoke */
1924    if( i < size )
1925    {
1926        int                n;
1927        struct ChokeData * c;
1928        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1929
1930        for( ; i < size; ++i )
1931        {
1932            if( choke[i].isInterested )
1933            {
1934                const tr_peer * peer = choke[i].peer;
1935                int             x = 1, y;
1936                if( isNew( peer ) ) x *= 3;
1937                if( isSame( peer ) ) x *= 3;
1938                for( y = 0; y < x; ++y )
1939                    tr_ptrArrayAppend( randPool, &choke[i] );
1940            }
1941        }
1942
1943        if( ( n = tr_ptrArraySize( randPool ) ) )
1944        {
1945            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1946            c->doUnchoke = 1;
1947            t->optimistic = c->peer;
1948        }
1949
1950        tr_ptrArrayFree( randPool, NULL );
1951    }
1952
1953    for( i = 0; i < size; ++i )
1954        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1955
1956    /* cleanup */
1957    tr_free( choke );
1958    tr_free( peers );
1959}
1960
1961static int
1962rechokePulse( void * vtorrent )
1963{
1964    Torrent * t = vtorrent;
1965
1966    torrentLock( t );
1967    rechoke( t );
1968    torrentUnlock( t );
1969    return TRUE;
1970}
1971
1972/***
1973****
1974****  Life and Death
1975****
1976***/
1977
1978static int
1979shouldPeerBeClosed( const Torrent * t,
1980                    const tr_peer * peer,
1981                    int             peerCount )
1982{
1983    const tr_torrent *       tor = t->tor;
1984    const time_t             now = time( NULL );
1985    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1986
1987    /* if it's marked for purging, close it */
1988    if( peer->doPurge )
1989    {
1990        tordbg( t, "purging peer %s because its doPurge flag is set",
1991               tr_peerIoAddrStr( &atom->addr,
1992                                 atom->port ) );
1993        return TRUE;
1994    }
1995
1996    /* if we're seeding and the peer has everything we have,
1997     * and enough time has passed for a pex exchange, then disconnect */
1998    if( tr_torrentIsSeed( tor ) )
1999    {
2000        int peerHasEverything;
2001        if( atom->flags & ADDED_F_SEED_FLAG )
2002            peerHasEverything = TRUE;
2003        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2004            peerHasEverything = FALSE;
2005        else
2006        {
2007            tr_bitfield * tmp =
2008                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2009            tr_bitfieldDifference( tmp, peer->have );
2010            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2011            tr_bitfieldFree( tmp );
2012        }
2013        if( peerHasEverything
2014          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2015        {
2016            tordbg( t, "purging peer %s because we're both seeds",
2017                   tr_peerIoAddrStr( &atom->addr,
2018                                     atom->port ) );
2019            return TRUE;
2020        }
2021    }
2022
2023    /* disconnect if it's been too long since piece data has been transferred.
2024     * this is on a sliding scale based on number of available peers... */
2025    {
2026        const int    relaxStrictnessIfFewerThanN =
2027            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2028        /* if we have >= relaxIfFewerThan, strictness is 100%.
2029         * if we have zero connections, strictness is 0% */
2030        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2031                                  ? 1.0
2032                                  : peerCount /
2033                                  (float)relaxStrictnessIfFewerThanN;
2034        const int    lo = MIN_UPLOAD_IDLE_SECS;
2035        const int    hi = MAX_UPLOAD_IDLE_SECS;
2036        const int    limit = lo + ( ( hi - lo ) * strictness );
2037        const time_t then = peer->pieceDataActivityDate;
2038        const int    idleTime = then ? ( now - then ) : 0;
2039        if( idleTime > limit )
2040        {
2041            tordbg(
2042                t,
2043                "purging peer %s because it's been %d secs since we shared anything",
2044                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2045            return TRUE;
2046        }
2047    }
2048
2049    return FALSE;
2050}
2051
2052static tr_peer **
2053getPeersToClose( Torrent * t,
2054                 int *     setmeSize )
2055{
2056    int               i, peerCount, outsize;
2057    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2058                                                           &peerCount );
2059    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2060
2061    assert( torrentIsLocked( t ) );
2062
2063    for( i = outsize = 0; i < peerCount; ++i )
2064        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2065            ret[outsize++] = peers[i];
2066
2067    *setmeSize = outsize;
2068    return ret;
2069}
2070
2071static int
2072compareCandidates( const void * va,
2073                   const void * vb )
2074{
2075    const struct peer_atom * a = *(const struct peer_atom**) va;
2076    const struct peer_atom * b = *(const struct peer_atom**) vb;
2077
2078    /* <Charles> Here we would probably want to try reconnecting to
2079     * peers that had most recently given us data. Lots of users have
2080     * trouble with resets due to their routers and/or ISPs. This way we
2081     * can quickly recover from an unwanted reset. So we sort
2082     * piece_data_time in descending order.
2083     */
2084
2085    if( a->piece_data_time != b->piece_data_time )
2086        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2087
2088    if( a->numFails != b->numFails )
2089        return a->numFails < b->numFails ? -1 : 1;
2090
2091    if( a->time != b->time )
2092        return a->time < b->time ? -1 : 1;
2093
2094    return 0;
2095}
2096
2097static int
2098getReconnectIntervalSecs( const struct peer_atom * atom )
2099{
2100    int          sec;
2101    const time_t now = time( NULL );
2102
2103    /* if we were recently connected to this peer and transferring piece
2104     * data, try to reconnect to them sooner rather that later -- we don't
2105     * want network troubles to get in the way of a good peer. */
2106    if( ( now - atom->piece_data_time ) <=
2107       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2108        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2109
2110    /* don't allow reconnects more often than our minimum */
2111    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2112        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2113
2114    /* otherwise, the interval depends on how many times we've tried
2115     * and failed to connect to the peer */
2116    else switch( atom->numFails )
2117        {
2118            case 0:
2119                sec = 0; break;
2120
2121            case 1:
2122                sec = 5; break;
2123
2124            case 2:
2125                sec = 2 * 60; break;
2126
2127            case 3:
2128                sec = 15 * 60; break;
2129
2130            case 4:
2131                sec = 30 * 60; break;
2132
2133            case 5:
2134                sec = 60 * 60; break;
2135
2136            default:
2137                sec = 120 * 60; break;
2138        }
2139
2140    return sec;
2141}
2142
2143static struct peer_atom **
2144getPeerCandidates(                               Torrent * t,
2145                                           int * setmeSize )
2146{
2147    int                 i, atomCount, retCount;
2148    struct peer_atom ** atoms;
2149    struct peer_atom ** ret;
2150    const time_t        now = time( NULL );
2151    const int           seed = tr_torrentIsSeed( t->tor );
2152
2153    assert( torrentIsLocked( t ) );
2154
2155    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2156    ret = tr_new( struct peer_atom*, atomCount );
2157    for( i = retCount = 0; i < atomCount; ++i )
2158    {
2159        int                interval;
2160        struct peer_atom * atom = atoms[i];
2161
2162        /* peer fed us too much bad data ... we only keep it around
2163         * now to weed it out in case someone sends it to us via pex */
2164        if( atom->myflags & MYFLAG_BANNED )
2165            continue;
2166
2167        /* peer was unconnectable before, so we're not going to keep trying.
2168         * this is needs a separate flag from `banned', since if they try
2169         * to connect to us later, we'll let them in */
2170        if( atom->myflags & MYFLAG_UNREACHABLE )
2171            continue;
2172
2173        /* we don't need two connections to the same peer... */
2174        if( peerIsInUse( t, &atom->addr ) )
2175            continue;
2176
2177        /* no need to connect if we're both seeds... */
2178        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2179            continue;
2180
2181        /* don't reconnect too often */
2182        interval = getReconnectIntervalSecs( atom );
2183        if( ( now - atom->time ) < interval )
2184        {
2185            tordbg(
2186                t,
2187                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2188                i, tr_peerIoAddrStr( &atom->addr,
2189                                     atom->port ), interval );
2190            continue;
2191        }
2192
2193        /* Don't connect to peers in our blocklist */
2194        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2195            continue;
2196
2197        ret[retCount++] = atom;
2198    }
2199
2200    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2201    *setmeSize = retCount;
2202    return ret;
2203}
2204
2205static int
2206reconnectPulse( void * vtorrent )
2207{
2208    Torrent *     t = vtorrent;
2209    static time_t prevTime = 0;
2210    static int    newConnectionsThisSecond = 0;
2211    time_t        now;
2212
2213    torrentLock( t );
2214
2215    now = time( NULL );
2216    if( prevTime != now )
2217    {
2218        prevTime = now;
2219        newConnectionsThisSecond = 0;
2220    }
2221
2222    if( !t->isRunning )
2223    {
2224        removeAllPeers( t );
2225    }
2226    else
2227    {
2228        int                 i, nCandidates, nBad;
2229        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2230        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2231
2232        if( nBad || nCandidates )
2233            tordbg(
2234                t, "reconnect pulse for [%s]: %d bad connections, "
2235                   "%d connection candidates, %d atoms, max per pulse is %d",
2236                t->tor->info.name, nBad, nCandidates,
2237                tr_ptrArraySize( t->pool ),
2238                (int)MAX_RECONNECTIONS_PER_PULSE );
2239
2240        /* disconnect some peers.
2241           if we transferred piece data, then they might be good peers,
2242           so reset their `numFails' weight to zero.  otherwise we connected
2243           to them fruitlessly, so mark it as another fail */
2244        for( i = 0; i < nBad; ++i )
2245        {
2246            tr_peer *          peer = connections[i];
2247            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2248            if( peer->pieceDataActivityDate )
2249                atom->numFails = 0;
2250            else
2251                ++atom->numFails;
2252            tordbg( t, "removing bad peer %s",
2253                   tr_peerIoGetAddrStr( peer->io ) );
2254            removePeer( t, peer );
2255        }
2256
2257        /* add some new ones */
2258        for( i = 0;    ( i < nCandidates )
2259           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2260           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2261           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2262             ++i )
2263        {
2264            tr_peerMgr *       mgr = t->manager;
2265            struct peer_atom * atom = candidates[i];
2266            tr_peerIo *        io;
2267
2268            tordbg( t, "Starting an OUTGOING connection with %s",
2269                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2270
2271            io =
2272                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2273                                      t->hash );
2274            if( io == NULL )
2275            {
2276                atom->myflags |= MYFLAG_UNREACHABLE;
2277            }
2278            else
2279            {
2280                tr_handshake * handshake = tr_handshakeNew(
2281                    io,
2282                    mgr->session->
2283                    encryptionMode,
2284                    myHandshakeDoneCB,
2285                    mgr );
2286
2287                assert( tr_peerIoGetTorrentHash( io ) );
2288
2289                ++newConnectionsThisSecond;
2290
2291                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2292                                         handshakeCompare );
2293            }
2294
2295            atom->time = time( NULL );
2296        }
2297
2298        /* cleanup */
2299        tr_free( connections );
2300        tr_free( candidates );
2301    }
2302
2303    torrentUnlock( t );
2304    return TRUE;
2305}
2306
2307/****
2308*****
2309*****  BANDWIDTH ALLOCATION
2310*****
2311****/
2312
2313static void
2314pumpAllPeers( tr_peerMgr * mgr )
2315{
2316    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2317    int       i, j;
2318
2319    for( i=0; i<torrentCount; ++i )
2320    {
2321        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2322        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2323        {
2324            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2325            tr_peerMsgsPulse( peer->msgs );
2326        }
2327    }
2328}
2329
2330static int
2331bandwidthPulse( void * vmgr )
2332{
2333    tr_peerMgr * mgr = vmgr;
2334    managerLock( mgr );
2335
2336    pumpAllPeers( mgr );
2337    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2338    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2339    pumpAllPeers( mgr );
2340
2341    managerUnlock( mgr );
2342    return TRUE;
2343}
Note: See TracBrowser for help on using the repository browser.