source: trunk/libtransmission/peer-mgr.c @ 7357

Last change on this file since 7357 was 7357, checked in by charles, 12 years ago

(libT) add some documentation about the three separate peer structs and how they are related.

  • Property svn:keywords set to Date Rev Author Id
File size: 64.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7357 2008-12-11 17:02:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/**
100 * Peer information that should be kept even before we've connected and
101 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
102 * which ones would make good candidates for connecting to, and to watch out
103 * for banned peers.
104 *
105 * @see tr_peer
106 * @see tr_peermsgs
107 */
108struct peer_atom
109{
110    uint8_t     from;
111    uint8_t     flags;       /* these match the added_f flags */
112    uint8_t     myflags;     /* flags that aren't defined in added_f */
113    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
114    uint8_t     partialSeed;
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray *   pool; /* struct peer_atom */
130    tr_ptrArray *   peers; /* tr_peer */
131    tr_ptrArray *   webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray     * torrents; /* Torrent */
146    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va,
202                        const void * vb )
203{
204    const tr_handshake * a = va;
205
206    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
207}
208
209static int
210handshakeCompare( const void * a,
211                  const void * b )
212{
213    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
214}
215
216static tr_handshake*
217getExistingHandshake( tr_ptrArray      * handshakes,
218                      const tr_address * addr )
219{
220    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
221}
222
223static int
224comparePeerAtomToAddress( const void * va, const void * vb )
225{
226    const struct peer_atom * a = va;
227
228    return tr_compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va, const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va, const void * vb )
274{
275    const tr_peer * a = va;
276    const tr_peer * b = vb;
277
278    return tr_compareAddresses( &a->addr, &b->addr );
279}
280
281static int
282peerCompareToAddr( const void * va, const void * vb )
283{
284    const tr_peer * a = va;
285
286    return tr_compareAddresses( &a->addr, vb );
287}
288
289static tr_peer*
290getExistingPeer( Torrent          * torrent,
291                 const tr_address * addr )
292{
293    assert( torrentIsLocked( torrent ) );
294    assert( addr );
295
296    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
297}
298
299static struct peer_atom*
300getExistingAtom( const Torrent    * t,
301                 const tr_address * addr )
302{
303    assert( torrentIsLocked( t ) );
304    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
305}
306
307static tr_bool
308peerIsInUse( const Torrent    * ct,
309             const tr_address * addr )
310{
311    Torrent * t = (Torrent*) ct;
312
313    assert( torrentIsLocked ( t ) );
314
315    return getExistingPeer( t, addr )
316        || getExistingHandshake( t->outgoingHandshakes, addr )
317        || getExistingHandshake( t->manager->incomingHandshakes, addr );
318}
319
320static tr_peer*
321peerConstructor( tr_torrent * tor, const tr_address * addr )
322{
323    tr_peer * p;
324    p = tr_new0( tr_peer, 1 );
325    p->addr = *addr;
326    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
327    return p;
328}
329
330static tr_peer*
331getPeer( Torrent          * torrent,
332         const tr_address * addr )
333{
334    tr_peer * peer;
335
336    assert( torrentIsLocked( torrent ) );
337
338    peer = getExistingPeer( torrent, addr );
339
340    if( peer == NULL )
341    {
342        peer = peerConstructor( torrent->tor, addr );
343        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
344    }
345
346    return peer;
347}
348
349static void
350peerDestructor( tr_peer * peer )
351{
352    assert( peer );
353
354    if( peer->msgs != NULL )
355    {
356        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
357        tr_peerMsgsFree( peer->msgs );
358    }
359
360    tr_peerIoFree( peer->io );
361
362    tr_bitfieldFree( peer->have );
363    tr_bitfieldFree( peer->blame );
364    tr_free( peer->client );
365
366    tr_bandwidthFree( peer->bandwidth );
367
368    tr_free( peer );
369}
370
371static void
372removePeer( Torrent * t,
373            tr_peer * peer )
374{
375    tr_peer *          removed;
376    struct peer_atom * atom;
377
378    assert( torrentIsLocked( t ) );
379
380    atom = getExistingAtom( t, &peer->addr );
381    assert( atom );
382    atom->time = time( NULL );
383
384    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
385    assert( removed == peer );
386    peerDestructor( removed );
387}
388
389static void
390removeAllPeers( Torrent * t )
391{
392    while( !tr_ptrArrayEmpty( t->peers ) )
393        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
394}
395
396static void
397torrentDestructor( void * vt )
398{
399    Torrent * t = vt;
400    uint8_t   hash[SHA_DIGEST_LENGTH];
401
402    assert( t );
403    assert( !t->isRunning );
404    assert( t->peers );
405    assert( torrentIsLocked( t ) );
406    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
407    assert( tr_ptrArrayEmpty( t->peers ) );
408
409    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
410
411    tr_timerFree( &t->reconnectTimer );
412    tr_timerFree( &t->rechokeTimer );
413    tr_timerFree( &t->refillTimer );
414
415    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
416    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
417    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
418    tr_ptrArrayFree( t->peers, NULL );
419
420    tr_free( t->pendingRequestCount );
421    tr_free( t );
422}
423
424static void peerCallbackFunc( void * vpeer,
425                              void * vevent,
426                              void * vt );
427
428static Torrent*
429torrentConstructor( tr_peerMgr * manager,
430                    tr_torrent * tor )
431{
432    int       i;
433    Torrent * t;
434
435    t = tr_new0( Torrent, 1 );
436    t->manager = manager;
437    t->tor = tor;
438    t->pool = tr_ptrArrayNew( );
439    t->peers = tr_ptrArrayNew( );
440    t->webseeds = tr_ptrArrayNew( );
441    t->outgoingHandshakes = tr_ptrArrayNew( );
442    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
443
444    for( i = 0; i < tor->info.webseedCount; ++i )
445    {
446        tr_webseed * w =
447            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
448                           t );
449        tr_ptrArrayAppend( t->webseeds, w );
450    }
451
452    return t;
453}
454
455
456static int bandwidthPulse( void * vmgr );
457
458
459tr_peerMgr*
460tr_peerMgrNew( tr_session * session )
461{
462    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
463
464    m->session = session;
465    m->torrents = tr_ptrArrayNew( );
466    m->incomingHandshakes = tr_ptrArrayNew( );
467    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
468    return m;
469}
470
471void
472tr_peerMgrFree( tr_peerMgr * manager )
473{
474    managerLock( manager );
475
476    tr_timerFree( &manager->bandwidthTimer );
477
478    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
479     * the item from manager->handshakes, so this is a little roundabout... */
480    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
481        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
482
483    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
484
485    /* free the torrents. */
486    tr_ptrArrayFree( manager->torrents, torrentDestructor );
487
488    managerUnlock( manager );
489    tr_free( manager );
490}
491
492static tr_peer**
493getConnectedPeers( Torrent * t, int * setmeCount )
494{
495    int i, peerCount, connectionCount;
496    tr_peer **peers;
497    tr_peer **ret;
498
499    assert( torrentIsLocked( t ) );
500
501    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
502    ret = tr_new( tr_peer *, peerCount );
503
504    for( i = connectionCount = 0; i < peerCount; ++i )
505        if( peers[i]->msgs )
506            ret[connectionCount++] = peers[i];
507
508    *setmeCount = connectionCount;
509    return ret;
510}
511
512static int
513clientIsDownloadingFrom( const tr_peer * peer )
514{
515    return peer->clientIsInterested && !peer->clientIsChoked;
516}
517
518static int
519clientIsUploadingTo( const tr_peer * peer )
520{
521    return peer->peerIsInterested && !peer->peerIsChoked;
522}
523
524/***
525****
526***/
527
528tr_bool
529tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
530                      const uint8_t      * torrentHash,
531                      const tr_address   * addr )
532{
533    tr_bool isSeed = FALSE;
534    const Torrent * t = NULL;
535    const struct peer_atom * atom = NULL;
536
537    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
538    if( t )
539        atom = getExistingAtom( t, addr );
540    if( atom )
541        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
542
543    return isSeed;
544}
545
546/****
547*****
548*****  REFILL
549*****
550****/
551
552static void
553assertValidPiece( Torrent * t, tr_piece_index_t piece )
554{
555    assert( t );
556    assert( t->tor );
557    assert( piece < t->tor->info.pieceCount );
558}
559
560static int
561getPieceRequests( Torrent * t, tr_piece_index_t piece )
562{
563    assertValidPiece( t, piece );
564
565    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
566}
567
568static void
569incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
570{
571    assertValidPiece( t, piece );
572
573    if( t->pendingRequestCount == NULL )
574        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
575    t->pendingRequestCount[piece]++;
576}
577
578static void
579decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
580{
581    assertValidPiece( t, piece );
582
583    if( t->pendingRequestCount )
584        t->pendingRequestCount[piece]--;
585}
586
587struct tr_refill_piece
588{
589    tr_priority_t    priority;
590    uint32_t         piece;
591    uint32_t         peerCount;
592    int              random;
593    int              pendingRequestCount;
594    int              missingBlockCount;
595};
596
597static int
598compareRefillPiece( const void * aIn, const void * bIn )
599{
600    const struct tr_refill_piece * a = aIn;
601    const struct tr_refill_piece * b = bIn;
602
603    /* if one piece has a higher priority, it goes first */
604    if( a->priority != b->priority )
605        return a->priority > b->priority ? -1 : 1;
606
607    /* have a per-priority endgame */
608    if( a->pendingRequestCount != b->pendingRequestCount )
609        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
610
611    /* fewer missing pieces goes first */
612    if( a->missingBlockCount != b->missingBlockCount )
613        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
614
615    /* otherwise if one has fewer peers, it goes first */
616    if( a->peerCount != b->peerCount )
617        return a->peerCount < b->peerCount ? -1 : 1;
618
619    /* otherwise go with our random seed */
620    if( a->random != b->random )
621        return a->random < b->random ? -1 : 1;
622
623    return 0;
624}
625
626static tr_piece_index_t *
627getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
628{
629    const tr_torrent  * tor = t->tor;
630    const tr_info     * inf = &tor->info;
631    tr_piece_index_t    i;
632    tr_piece_index_t    poolSize = 0;
633    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
634    int                 peerCount;
635    tr_peer**           peers;
636
637    assert( torrentIsLocked( t ) );
638
639    peers = getConnectedPeers( t, &peerCount );
640
641    /* make a list of the pieces that we want but don't have */
642    for( i = 0; i < inf->pieceCount; ++i )
643        if( !tor->info.pieces[i].dnd
644                && !tr_cpPieceIsComplete( tor->completion, i ) )
645            pool[poolSize++] = i;
646
647    /* sort the pool by which to request next */
648    if( poolSize > 1 )
649    {
650        tr_piece_index_t j;
651        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
652
653        for( j = 0; j < poolSize; ++j )
654        {
655            int k;
656            const tr_piece_index_t piece = pool[j];
657            struct tr_refill_piece * setme = p + j;
658
659            setme->piece = piece;
660            setme->priority = inf->pieces[piece].priority;
661            setme->peerCount = 0;
662            setme->random = tr_cryptoWeakRandInt( INT_MAX );
663            setme->pendingRequestCount = getPieceRequests( t, piece );
664            setme->missingBlockCount
665                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
666
667            for( k = 0; k < peerCount; ++k )
668            {
669                const tr_peer * peer = peers[k];
670                if( peer->peerIsInterested
671                        && !peer->clientIsChoked
672                        && tr_bitfieldHas( peer->have, piece ) )
673                    ++setme->peerCount;
674            }
675        }
676
677        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
678               compareRefillPiece );
679
680        for( j = 0; j < poolSize; ++j )
681            pool[j] = p[j].piece;
682
683        tr_free( p );
684    }
685
686    tr_free( peers );
687
688    *pieceCount = poolSize;
689    return pool;
690}
691
692struct tr_blockIterator
693{
694    Torrent * t;
695    tr_block_index_t blockIndex, blockCount, *blocks;
696    tr_piece_index_t pieceIndex, pieceCount, *pieces;
697};
698
699static struct tr_blockIterator*
700blockIteratorNew( Torrent * t )
701{
702    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
703    i->t = t;
704    i->pieces = getPreferredPieces( t, &i->pieceCount );
705    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
706    return i;
707}
708
709static int
710blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
711{
712    int found;
713    Torrent * t = i->t;
714    tr_torrent * tor = t->tor;
715
716    while( ( i->blockIndex == i->blockCount )
717        && ( i->pieceIndex < i->pieceCount ) )
718    {
719        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
720        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
721        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
722        tr_block_index_t block;
723
724        assert( index < tor->info.pieceCount );
725
726        i->blockCount = 0;
727        i->blockIndex = 0;
728        for( block=b; block!=e; ++block )
729            if( !tr_cpBlockIsComplete( tor->completion, block ) )
730                i->blocks[i->blockCount++] = block;
731    }
732
733    if(( found = ( i->blockIndex < i->blockCount )))
734        *setme = i->blocks[i->blockIndex++];
735
736    return found;
737}
738
739static void
740blockIteratorFree( struct tr_blockIterator * i )
741{
742    tr_free( i->blocks );
743    tr_free( i->pieces );
744    tr_free( i );
745}
746
747static tr_peer**
748getPeersUploadingToClient( Torrent * t,
749                           int *     setmeCount )
750{
751    int j;
752    int peerCount = 0;
753    int retCount = 0;
754    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
755    tr_peer ** ret = tr_new( tr_peer *, peerCount );
756
757    j = 0; /* this is a temporary test to make sure we walk through all the peers */
758    if( peerCount )
759    {
760        /* Get a list of peers we're downloading from.
761           Pick a different starting point each time so all peers
762           get a chance at being the first in line */
763        const int fencepost = tr_cryptoWeakRandInt( peerCount );
764        int i = fencepost;
765        do {
766            if( clientIsDownloadingFrom( peers[i] ) )
767                ret[retCount++] = peers[i];
768            i = ( i + 1 ) % peerCount;
769            ++j;
770        } while( i != fencepost );
771    }
772    assert( j == peerCount );
773    *setmeCount = retCount;
774    return ret;
775}
776
777static uint32_t
778getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
779{
780    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
781    const uint64_t blockPos = tor->blockSize * b;
782    assert( blockPos >= piecePos );
783    return (uint32_t)( blockPos - piecePos );
784}
785
786static int
787refillPulse( void * vtorrent )
788{
789    tr_block_index_t block;
790    int peerCount;
791    int webseedCount;
792    tr_peer ** peers;
793    tr_webseed ** webseeds;
794    struct tr_blockIterator * blockIterator;
795    Torrent * t = vtorrent;
796    tr_torrent * tor = t->tor;
797
798    if( !t->isRunning )
799        return TRUE;
800    if( tr_torrentIsSeed( t->tor ) )
801        return TRUE;
802
803    torrentLock( t );
804    tordbg( t, "Refilling Request Buffers..." );
805
806    blockIterator = blockIteratorNew( t );
807    peers = getPeersUploadingToClient( t, &peerCount );
808    webseedCount = tr_ptrArraySize( t->webseeds );
809    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
810                          webseedCount * sizeof( tr_webseed* ) );
811
812    while( ( webseedCount || peerCount )
813        && blockIteratorNext( blockIterator, &block ) )
814    {
815        int j;
816        int handled = FALSE;
817
818        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
819        const uint32_t offset = getBlockOffsetInPiece( tor, block );
820        const uint32_t length = tr_torBlockCountBytes( tor, block );
821
822        /* find a peer who can ask for this block */
823        for( j=0; !handled && j<peerCount; )
824        {
825            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
826            switch( val )
827            {
828                case TR_ADDREQ_FULL:
829                case TR_ADDREQ_CLIENT_CHOKED:
830                    peers[j] = peers[--peerCount];
831                    break;
832
833                case TR_ADDREQ_MISSING:
834                case TR_ADDREQ_DUPLICATE:
835                    ++j;
836                    break;
837
838                case TR_ADDREQ_OK:
839                    incrementPieceRequests( t, index );
840                    handled = TRUE;
841                    break;
842
843                default:
844                    assert( 0 && "unhandled value" );
845                    break;
846            }
847        }
848
849        /* maybe one of the webseeds can do it */
850        for( j=0; !handled && j<webseedCount; )
851        {
852            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
853                                                          index, offset, length );
854            switch( val )
855            {
856                case TR_ADDREQ_FULL:
857                    webseeds[j] = webseeds[--webseedCount];
858                    break;
859
860                case TR_ADDREQ_OK:
861                    incrementPieceRequests( t, index );
862                    handled = TRUE;
863                    break;
864
865                default:
866                    assert( 0 && "unhandled value" );
867                    break;
868            }
869        }
870    }
871
872    /* cleanup */
873    blockIteratorFree( blockIterator );
874    tr_free( webseeds );
875    tr_free( peers );
876
877    t->refillTimer = NULL;
878    torrentUnlock( t );
879    return FALSE;
880}
881
882static void
883broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
884{
885    int i, size;
886    tr_peer ** peers;
887
888    assert( torrentIsLocked( t ) );
889
890    peers = getConnectedPeers( t, &size );
891    for( i=0; i<size; ++i )
892        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
893    tr_free( peers );
894}
895
896static void
897addStrike( Torrent * t,
898           tr_peer * peer )
899{
900    tordbg( t, "increasing peer %s strike count to %d",
901            tr_peerIoAddrStr( &peer->addr,
902                              peer->port ), peer->strikes + 1 );
903
904    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
905    {
906        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
907        atom->myflags |= MYFLAG_BANNED;
908        peer->doPurge = 1;
909        tordbg( t, "banning peer %s",
910               tr_peerIoAddrStr( &atom->addr, atom->port ) );
911    }
912}
913
914static void
915gotBadPiece( Torrent *        t,
916             tr_piece_index_t pieceIndex )
917{
918    tr_torrent *   tor = t->tor;
919    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
920
921    tor->corruptCur += byteCount;
922    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
923}
924
925static void
926refillSoon( Torrent * t )
927{
928    if( t->refillTimer == NULL )
929        t->refillTimer = tr_timerNew( t->manager->session,
930                                      refillPulse, t,
931                                      REFILL_PERIOD_MSEC );
932}
933
934static void
935peerSuggestedPiece( Torrent            * t,
936                    tr_peer            * peer,
937                    tr_piece_index_t     pieceIndex,
938                    int                  isFastAllowed )
939{
940    assert( t );
941    assert( peer );
942    assert( peer->msgs );
943
944    /* is this a valid piece? */
945    if(  pieceIndex >= t->tor->info.pieceCount )
946        return;
947
948    /* don't ask for it if we've already got it */
949    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
950        return;
951
952    /* don't ask for it if they don't have it */
953    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
954        return;
955
956    /* don't ask for it if we're choked and it's not fast */
957    if( !isFastAllowed && peer->clientIsChoked )
958        return;
959
960    /* request the blocks that we don't have in this piece */
961    {
962        tr_block_index_t block;
963        const tr_torrent * tor = t->tor;
964        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
965        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
966
967        for( block=start; block<end; ++block )
968        {
969            if( !tr_cpBlockIsComplete( tor->completion, block ) )
970            {
971                const uint32_t offset = getBlockOffsetInPiece( tor, block );
972                const uint32_t length = tr_torBlockCountBytes( tor, block );
973                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
974                incrementPieceRequests( t, pieceIndex );
975            }
976        }
977    }
978}
979
980static void
981peerCallbackFunc( void * vpeer,
982                  void * vevent,
983                  void * vt )
984{
985    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
986    Torrent             * t = vt;
987    const tr_peer_event * e = vevent;
988
989    torrentLock( t );
990
991    switch( e->eventType )
992    {
993        case TR_PEER_UPLOAD_ONLY:
994            /* update our atom */
995            if( peer ) {
996                struct peer_atom * a = getExistingAtom( t, &peer->addr );
997                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
998            }
999            break;
1000
1001        case TR_PEER_NEED_REQ:
1002            refillSoon( t );
1003            break;
1004
1005        case TR_PEER_CANCEL:
1006            decrementPieceRequests( t, e->pieceIndex );
1007            break;
1008
1009        case TR_PEER_PEER_GOT_DATA:
1010        {
1011            const time_t now = time( NULL );
1012            tr_torrent * tor = t->tor;
1013
1014            tor->activityDate = now;
1015
1016            if( e->wasPieceData )
1017                tor->uploadedCur += e->length;
1018
1019            /* update the stats */
1020            if( e->wasPieceData )
1021                tr_statsAddUploaded( tor->session, e->length );
1022
1023            /* update our atom */
1024            if( peer ) {
1025                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1026                if( e->wasPieceData )
1027                    a->piece_data_time = now;
1028            }
1029
1030            break;
1031        }
1032
1033        case TR_PEER_CLIENT_GOT_SUGGEST:
1034            if( peer )
1035                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1036            break;
1037
1038        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1039            if( peer )
1040                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1041            break;
1042
1043        case TR_PEER_CLIENT_GOT_DATA:
1044        {
1045            const time_t now = time( NULL );
1046            tr_torrent * tor = t->tor;
1047            tor->activityDate = now;
1048
1049            /* only add this to downloadedCur if we got it from a peer --
1050             * webseeds shouldn't count against our ratio.  As one tracker
1051             * admin put it, "Those pieces are downloaded directly from the
1052             * content distributor, not the peers, it is the tracker's job
1053             * to manage the swarms, not the web server and does not fit
1054             * into the jurisdiction of the tracker." */
1055            if( peer && e->wasPieceData )
1056                tor->downloadedCur += e->length;
1057
1058            /* update the stats */ 
1059            if( e->wasPieceData )
1060                tr_statsAddDownloaded( tor->session, e->length );
1061
1062            /* update our atom */
1063            if( peer ) {
1064                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1065                if( e->wasPieceData )
1066                    a->piece_data_time = now;
1067            }
1068
1069            break;
1070        }
1071
1072        case TR_PEER_PEER_PROGRESS:
1073        {
1074            if( peer )
1075            {
1076                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1077                const int peerIsSeed = e->progress >= 1.0;
1078                if( peerIsSeed ) {
1079                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1080                    atom->flags |= ADDED_F_SEED_FLAG;
1081                } else {
1082                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1083                    atom->flags &= ~ADDED_F_SEED_FLAG;
1084                }
1085            }
1086            break;
1087        }
1088
1089        case TR_PEER_CLIENT_GOT_BLOCK:
1090        {
1091            tr_torrent *     tor = t->tor;
1092
1093            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1094
1095            tr_cpBlockAdd( tor->completion, block );
1096            decrementPieceRequests( t, e->pieceIndex );
1097
1098            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1099
1100            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1101            {
1102                const tr_piece_index_t p = e->pieceIndex;
1103                const tr_bool ok = tr_ioTestPiece( tor, p );
1104
1105                if( !ok )
1106                {
1107                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1108                               (unsigned long)p );
1109                }
1110
1111                tr_torrentSetHasPiece( tor, p, ok );
1112                tr_torrentSetPieceChecked( tor, p, TRUE );
1113                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1114
1115                if( !ok )
1116                    gotBadPiece( t, p );
1117                else
1118                {
1119                    int i, peerCount;
1120                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1121                    for( i = 0; i < peerCount; ++i )
1122                        tr_peerMsgsHave( peers[i]->msgs, p );
1123                    tr_free( peers );
1124                }
1125
1126                tr_torrentRecheckCompleteness( tor );
1127            }
1128            break;
1129        }
1130
1131        case TR_PEER_ERROR:
1132            if( e->err == EINVAL )
1133            {
1134                addStrike( t, peer );
1135                peer->doPurge = 1;
1136            }
1137            else if( ( e->err == ERANGE )
1138                  || ( e->err == EMSGSIZE )
1139                  || ( e->err == ENOTCONN ) )
1140            {
1141                /* some protocol error from the peer */
1142                peer->doPurge = 1;
1143            }
1144            else /* a local error, such as an IO error */
1145            {
1146                t->tor->error = e->err;
1147                tr_strlcpy( t->tor->errorString,
1148                            tr_strerror( t->tor->error ),
1149                            sizeof( t->tor->errorString ) );
1150                tr_torrentStop( t->tor );
1151            }
1152            break;
1153
1154        default:
1155            assert( 0 );
1156    }
1157
1158    torrentUnlock( t );
1159}
1160
1161static void
1162ensureAtomExists( Torrent          * t,
1163                  const tr_address * addr,
1164                  tr_port            port,
1165                  uint8_t            flags,
1166                  uint8_t            from )
1167{
1168    if( getExistingAtom( t, addr ) == NULL )
1169    {
1170        struct peer_atom * a;
1171        a = tr_new0( struct peer_atom, 1 );
1172        a->addr = *addr;
1173        a->port = port;
1174        a->flags = flags;
1175        a->from = from;
1176        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1177        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1178    }
1179}
1180
1181static int
1182getMaxPeerCount( const tr_torrent * tor )
1183{
1184    return tor->maxConnectedPeers;
1185}
1186
1187static int
1188getPeerCount( const Torrent * t )
1189{
1190    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1191}
1192
1193/* FIXME: this is kind of a mess. */
1194static tr_bool
1195myHandshakeDoneCB( tr_handshake *  handshake,
1196                   tr_peerIo *     io,
1197                   int             isConnected,
1198                   const uint8_t * peer_id,
1199                   void *          vmanager )
1200{
1201    tr_bool            ok = isConnected;
1202    tr_bool            success = FALSE;
1203    tr_port            port;
1204    const tr_address * addr;
1205    tr_peerMgr       * manager = vmanager;
1206    Torrent          * t;
1207    tr_handshake     * ours;
1208
1209    assert( io );
1210    assert( isConnected == 0 || isConnected == 1 );
1211
1212    t = tr_peerIoHasTorrentHash( io )
1213        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1214        : NULL;
1215
1216    if( tr_peerIoIsIncoming ( io ) )
1217        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1218                                        handshake, handshakeCompare );
1219    else if( t )
1220        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1221                                        handshake, handshakeCompare );
1222    else
1223        ours = handshake;
1224
1225    assert( ours );
1226    assert( ours == handshake );
1227
1228    if( t )
1229        torrentLock( t );
1230
1231    addr = tr_peerIoGetAddress( io, &port );
1232
1233    if( !ok || !t || !t->isRunning )
1234    {
1235        if( t )
1236        {
1237            struct peer_atom * atom = getExistingAtom( t, addr );
1238            if( atom )
1239                ++atom->numFails;
1240        }
1241
1242        tr_peerIoFree( io );
1243    }
1244    else /* looking good */
1245    {
1246        struct peer_atom * atom;
1247        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1248        atom = getExistingAtom( t, addr );
1249        atom->time = time( NULL );
1250        atom->piece_data_time = 0;
1251
1252        if( atom->myflags & MYFLAG_BANNED )
1253        {
1254            tordbg( t, "banned peer %s tried to reconnect",
1255                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1256            tr_peerIoFree( io );
1257        }
1258        else if( tr_peerIoIsIncoming( io )
1259               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1260
1261        {
1262            tr_peerIoFree( io );
1263        }
1264        else
1265        {
1266            tr_peer * peer = getExistingPeer( t, addr );
1267
1268            if( peer ) /* we already have this peer */
1269            {
1270                tr_peerIoFree( io );
1271            }
1272            else
1273            {
1274                peer = getPeer( t, addr );
1275                tr_free( peer->client );
1276
1277                if( !peer_id )
1278                    peer->client = NULL;
1279                else {
1280                    char client[128];
1281                    tr_clientForId( client, sizeof( client ), peer_id );
1282                    peer->client = tr_strdup( client );
1283                }
1284
1285                peer->port = port;
1286                peer->io = io;
1287                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1288                tr_peerIoSetBandwidth( io, peer->bandwidth );
1289
1290                success = TRUE;
1291            }
1292        }
1293    }
1294
1295    if( t )
1296        torrentUnlock( t );
1297
1298    return success;
1299}
1300
1301void
1302tr_peerMgrAddIncoming( tr_peerMgr * manager,
1303                       tr_address * addr,
1304                       tr_port      port,
1305                       int          socket )
1306{
1307    managerLock( manager );
1308
1309    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1310    {
1311        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1312        tr_netClose( socket );
1313    }
1314    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1315    {
1316        tr_netClose( socket );
1317    }
1318    else /* we don't have a connetion to them yet... */
1319    {
1320        tr_peerIo *    io;
1321        tr_handshake * handshake;
1322
1323        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1324
1325        handshake = tr_handshakeNew( io,
1326                                     manager->session->encryptionMode,
1327                                     myHandshakeDoneCB,
1328                                     manager );
1329
1330        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1331                                 handshakeCompare );
1332    }
1333
1334    managerUnlock( manager );
1335}
1336
1337void
1338tr_peerMgrAddPex( tr_peerMgr *    manager,
1339                  const uint8_t * torrentHash,
1340                  uint8_t         from,
1341                  const tr_pex *  pex )
1342{
1343    Torrent * t;
1344
1345    managerLock( manager );
1346
1347    t = getExistingTorrent( manager, torrentHash );
1348    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1349        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1350
1351    managerUnlock( manager );
1352}
1353
1354tr_pex *
1355tr_peerMgrCompactToPex( const void *    compact,
1356                        size_t          compactLen,
1357                        const uint8_t * added_f,
1358                        size_t          added_f_len,
1359                        size_t *        pexCount )
1360{
1361    size_t          i;
1362    size_t          n = compactLen / 6;
1363    const uint8_t * walk = compact;
1364    tr_pex *        pex = tr_new0( tr_pex, n );
1365
1366    for( i = 0; i < n; ++i )
1367    {
1368        pex[i].addr.type = TR_AF_INET;
1369        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1370        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1371        if( added_f && ( n == added_f_len ) )
1372            pex[i].flags = added_f[i];
1373    }
1374
1375    *pexCount = n;
1376    return pex;
1377}
1378
1379/**
1380***
1381**/
1382
1383void
1384tr_peerMgrSetBlame( tr_peerMgr *     manager,
1385                    const uint8_t *  torrentHash,
1386                    tr_piece_index_t pieceIndex,
1387                    int              success )
1388{
1389    if( !success )
1390    {
1391        int        peerCount, i;
1392        Torrent *  t = getExistingTorrent( manager, torrentHash );
1393        tr_peer ** peers;
1394
1395        assert( torrentIsLocked( t ) );
1396
1397        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1398        for( i = 0; i < peerCount; ++i )
1399        {
1400            tr_peer * peer = peers[i];
1401            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1402            {
1403                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1404                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1405                        pieceIndex, (int)peer->strikes + 1 );
1406                addStrike( t, peer );
1407            }
1408        }
1409    }
1410}
1411
1412int
1413tr_pexCompare( const void * va, const void * vb )
1414{
1415    const tr_pex * a = va;
1416    const tr_pex * b = vb;
1417    int i = tr_compareAddresses( &a->addr, &b->addr );
1418
1419    if( i ) return i;
1420    if( a->port < b->port ) return -1;
1421    if( a->port > b->port ) return 1;
1422    return 0;
1423}
1424
1425static int
1426peerPrefersCrypto( const tr_peer * peer )
1427{
1428    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1429        return TRUE;
1430
1431    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1432        return FALSE;
1433
1434    return tr_peerIoIsEncrypted( peer->io );
1435}
1436
1437int
1438tr_peerMgrGetPeers( tr_peerMgr *    manager,
1439                    const uint8_t * torrentHash,
1440                    tr_pex **       setme_pex )
1441{
1442    int peerCount = 0;
1443    const Torrent *  t;
1444
1445    managerLock( manager );
1446
1447    t = getExistingTorrent( manager, torrentHash );
1448    if( t == NULL )
1449    {
1450        *setme_pex = NULL;
1451    }
1452    else
1453    {
1454        int i;
1455        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1456        tr_pex * pex = tr_new( tr_pex, peerCount );
1457        tr_pex * walk = pex;
1458
1459        for( i=0; i<peerCount; ++i, ++walk )
1460        {
1461            const tr_peer * peer = peers[i];
1462            const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1463
1464            walk->addr = peer->addr;
1465            walk->port = peer->port;
1466            walk->flags = 0;
1467            if( peerPrefersCrypto( peer ) )
1468                walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1469            if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1470                walk->flags |= ADDED_F_SEED_FLAG;
1471        }
1472
1473        assert( ( walk - pex ) == peerCount );
1474        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1475        *setme_pex = pex;
1476    }
1477
1478    managerUnlock( manager );
1479    return peerCount;
1480}
1481
1482static int reconnectPulse( void * vtorrent );
1483
1484static int rechokePulse( void * vtorrent );
1485
1486void
1487tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1488                        const uint8_t * torrentHash )
1489{
1490    Torrent * t;
1491
1492    managerLock( manager );
1493
1494    t = getExistingTorrent( manager, torrentHash );
1495
1496    assert( t );
1497    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1498    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1499
1500    if( !t->isRunning )
1501    {
1502        t->isRunning = 1;
1503
1504        t->reconnectTimer = tr_timerNew( t->manager->session,
1505                                         reconnectPulse, t,
1506                                         RECONNECT_PERIOD_MSEC );
1507
1508        t->rechokeTimer = tr_timerNew( t->manager->session,
1509                                       rechokePulse, t,
1510                                       RECHOKE_PERIOD_MSEC );
1511
1512        reconnectPulse( t );
1513
1514        rechokePulse( t );
1515
1516        if( !tr_ptrArrayEmpty( t->webseeds ) )
1517            refillSoon( t );
1518    }
1519
1520    managerUnlock( manager );
1521}
1522
1523static void
1524stopTorrent( Torrent * t )
1525{
1526    assert( torrentIsLocked( t ) );
1527
1528    t->isRunning = 0;
1529    tr_timerFree( &t->rechokeTimer );
1530    tr_timerFree( &t->reconnectTimer );
1531
1532    /* disconnect the peers. */
1533    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1534    tr_ptrArrayClear( t->peers );
1535
1536    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1537     * which removes the handshake from t->outgoingHandshakes... */
1538    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1539        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1540}
1541
1542void
1543tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1544                       const uint8_t * torrentHash )
1545{
1546    managerLock( manager );
1547
1548    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1549
1550    managerUnlock( manager );
1551}
1552
1553void
1554tr_peerMgrAddTorrent( tr_peerMgr * manager,
1555                      tr_torrent * tor )
1556{
1557    Torrent * t;
1558
1559    managerLock( manager );
1560
1561    assert( tor );
1562    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1563
1564    t = torrentConstructor( manager, tor );
1565    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1566
1567    managerUnlock( manager );
1568}
1569
1570void
1571tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1572                         const uint8_t * torrentHash )
1573{
1574    Torrent * t;
1575
1576    managerLock( manager );
1577
1578    t = getExistingTorrent( manager, torrentHash );
1579    assert( t );
1580    stopTorrent( t );
1581    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1582    torrentDestructor( t );
1583
1584    managerUnlock( manager );
1585}
1586
1587void
1588tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1589                               const uint8_t *    torrentHash,
1590                               int8_t *           tab,
1591                               unsigned int       tabCount )
1592{
1593    tr_piece_index_t   i;
1594    const Torrent *    t;
1595    const tr_torrent * tor;
1596    float              interval;
1597    int                isSeed;
1598    int                peerCount;
1599    const tr_peer **   peers;
1600
1601    managerLock( manager );
1602
1603    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1604    tor = t->tor;
1605    interval = tor->info.pieceCount / (float)tabCount;
1606    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1607    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1608
1609    memset( tab, 0, tabCount );
1610
1611    for( i = 0; tor && i < tabCount; ++i )
1612    {
1613        const int piece = i * interval;
1614
1615        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1616            tab[i] = -1;
1617        else if( peerCount )
1618        {
1619            int j;
1620            for( j = 0; j < peerCount; ++j )
1621                if( tr_bitfieldHas( peers[j]->have, i ) )
1622                    ++tab[i];
1623        }
1624    }
1625
1626    managerUnlock( manager );
1627}
1628
1629/* Returns the pieces that are available from peers */
1630tr_bitfield*
1631tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1632                        const uint8_t *    torrentHash )
1633{
1634    int           i, size;
1635    Torrent *     t;
1636    tr_peer **    peers;
1637    tr_bitfield * pieces;
1638    managerLock( manager );
1639
1640    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1641    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1642    peers = getConnectedPeers( t, &size );
1643    for( i = 0; i < size; ++i )
1644        tr_bitfieldOr( pieces, peers[i]->have );
1645
1646    managerUnlock( manager );
1647    tr_free( peers );
1648    return pieces;
1649}
1650
1651int
1652tr_peerMgrHasConnections( const tr_peerMgr * manager,
1653                          const uint8_t *    torrentHash )
1654{
1655    int ret;
1656    const Torrent * t;
1657    managerLock( manager );
1658
1659    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1660    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1661
1662    managerUnlock( manager );
1663    return ret;
1664}
1665
1666void
1667tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1668                        const uint8_t    * torrentHash,
1669                        int              * setmePeersKnown,
1670                        int              * setmePeersConnected,
1671                        int              * setmeSeedsConnected,
1672                        int              * setmeWebseedsSendingToUs,
1673                        int              * setmePeersSendingToUs,
1674                        int              * setmePeersGettingFromUs,
1675                        int              * setmePeersFrom )
1676{
1677    int i, size;
1678    const Torrent * t;
1679    const tr_peer ** peers;
1680    const tr_webseed ** webseeds;
1681
1682    managerLock( manager );
1683
1684    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1685    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1686
1687    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1688    *setmePeersConnected       = 0;
1689    *setmeSeedsConnected       = 0;
1690    *setmePeersGettingFromUs   = 0;
1691    *setmePeersSendingToUs     = 0;
1692    *setmeWebseedsSendingToUs  = 0;
1693
1694    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1695        setmePeersFrom[i] = 0;
1696
1697    for( i=0; i<size; ++i )
1698    {
1699        const tr_peer * peer = peers[i];
1700        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1701
1702        if( peer->io == NULL ) /* not connected */
1703            continue;
1704
1705        ++*setmePeersConnected;
1706
1707        ++setmePeersFrom[atom->from];
1708
1709        if( clientIsDownloadingFrom( peer ) )
1710            ++*setmePeersSendingToUs;
1711
1712        if( clientIsUploadingTo( peer ) )
1713            ++*setmePeersGettingFromUs;
1714
1715        if( atom->flags & ADDED_F_SEED_FLAG )
1716            ++*setmeSeedsConnected;
1717    }
1718
1719    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1720    for( i=0; i<size; ++i )
1721    {
1722        if( tr_webseedIsActive( webseeds[i] ) )
1723            ++*setmeWebseedsSendingToUs;
1724    }
1725
1726    managerUnlock( manager );
1727}
1728
1729float*
1730tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1731                     const uint8_t *    torrentHash )
1732{
1733    const Torrent * t;
1734    const tr_webseed ** webseeds;
1735    int i;
1736    int webseedCount;
1737    float * ret;
1738
1739    assert( manager );
1740    managerLock( manager );
1741
1742    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1743    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1744                                                     &webseedCount );
1745    assert( webseedCount == t->tor->info.webseedCount );
1746    ret = tr_new0( float, webseedCount );
1747
1748    for( i=0; i<webseedCount; ++i )
1749        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1750            ret[i] = -1.0;
1751
1752    managerUnlock( manager );
1753    return ret;
1754}
1755
1756double
1757tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1758{
1759    assert( peer );
1760    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1761
1762    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1763}
1764
1765
1766struct tr_peer_stat *
1767tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1768                     const   uint8_t     * torrentHash,
1769                     int                 * setmeCount UNUSED )
1770{
1771    int             i, size;
1772    const Torrent * t;
1773    tr_peer **      peers;
1774    tr_peer_stat *  ret;
1775
1776    assert( manager );
1777    managerLock( manager );
1778
1779    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1780    peers = getConnectedPeers( (Torrent*)t, &size );
1781    ret = tr_new0( tr_peer_stat, size );
1782
1783    for( i = 0; i < size; ++i )
1784    {
1785        char *                   pch;
1786        const tr_peer *          peer = peers[i];
1787        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1788        tr_peer_stat *           stat = ret + i;
1789
1790        tr_ntop( &peer->addr, stat->addr, sizeof(stat->addr) );
1791        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1792        stat->port               = ntohs( peer->port );
1793        stat->from               = atom->from;
1794        stat->progress           = peer->progress;
1795        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1796        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1797        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1798        stat->peerIsChoked       = peer->peerIsChoked;
1799        stat->peerIsInterested   = peer->peerIsInterested;
1800        stat->clientIsChoked     = peer->clientIsChoked;
1801        stat->clientIsInterested = peer->clientIsInterested;
1802        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1803        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1804        stat->isUploadingTo      = clientIsUploadingTo( peer );
1805        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1806
1807        pch = stat->flagStr;
1808        if( t->optimistic == peer ) *pch++ = 'O';
1809        if( stat->isDownloadingFrom ) *pch++ = 'D';
1810        else if( stat->clientIsInterested ) *pch++ = 'd';
1811        if( stat->isUploadingTo ) *pch++ = 'U';
1812        else if( stat->peerIsInterested ) *pch++ = 'u';
1813        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1814        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1815        if( stat->isEncrypted ) *pch++ = 'E';
1816        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1817        if( stat->isIncoming ) *pch++ = 'I';
1818        *pch = '\0';
1819    }
1820
1821    *setmeCount = size;
1822    tr_free( peers );
1823
1824    managerUnlock( manager );
1825    return ret;
1826}
1827
1828/**
1829***
1830**/
1831
1832struct ChokeData
1833{
1834    tr_bool         doUnchoke;
1835    tr_bool         isInterested;
1836    tr_bool         isChoked;
1837    int             rate;
1838    tr_peer *       peer;
1839};
1840
1841static int
1842compareChoke( const void * va,
1843              const void * vb )
1844{
1845    const struct ChokeData * a = va;
1846    const struct ChokeData * b = vb;
1847
1848    if( a->rate != b->rate ) /* prefer higher overall speeds */
1849        return a->rate > b->rate ? -1 : 1;
1850
1851    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1852        return a->isChoked ? 1 : -1;
1853
1854    return 0;
1855}
1856
1857static int
1858isNew( const tr_peer * peer )
1859{
1860    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1861}
1862
1863static int
1864isSame( const tr_peer * peer )
1865{
1866    return peer && peer->client && strstr( peer->client, "Transmission" );
1867}
1868
1869/**
1870***
1871**/
1872
1873static void
1874rechoke( Torrent * t )
1875{
1876    int                i, peerCount, size, unchokedInterested;
1877    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1878    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1879    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1880
1881    assert( torrentIsLocked( t ) );
1882
1883    /* sort the peers by preference and rate */
1884    for( i = 0, size = 0; i < peerCount; ++i )
1885    {
1886        tr_peer * peer = peers[i];
1887        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1888
1889        if( peer->progress >= 1.0 ) /* choke all seeds */
1890        {
1891            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1892        }
1893        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1894        {
1895            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1896        }
1897        else if( chokeAll ) /* choke everyone if we're not uploading */
1898        {
1899            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1900        }
1901        else
1902        {
1903            struct ChokeData * n = &choke[size++];
1904            n->peer         = peer;
1905            n->isInterested = peer->peerIsInterested;
1906            n->isChoked     = peer->peerIsChoked;
1907            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1908        }
1909    }
1910
1911    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1912
1913    /**
1914     * Reciprocation and number of uploads capping is managed by unchoking
1915     * the N peers which have the best upload rate and are interested.
1916     * This maximizes the client's download rate. These N peers are
1917     * referred to as downloaders, because they are interested in downloading
1918     * from the client.
1919     *
1920     * Peers which have a better upload rate (as compared to the downloaders)
1921     * but aren't interested get unchoked. If they become interested, the
1922     * downloader with the worst upload rate gets choked. If a client has
1923     * a complete file, it uses its upload rate rather than its download
1924     * rate to decide which peers to unchoke.
1925     */
1926    unchokedInterested = 0;
1927    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1928        choke[i].doUnchoke = 1;
1929        if( choke[i].isInterested )
1930            ++unchokedInterested;
1931    }
1932
1933    /* optimistic unchoke */
1934    if( i < size )
1935    {
1936        int n;
1937        struct ChokeData * c;
1938        tr_ptrArray * randPool = tr_ptrArrayNew( );
1939
1940        for( ; i<size; ++i )
1941        {
1942            if( choke[i].isInterested )
1943            {
1944                const tr_peer * peer = choke[i].peer;
1945                int x = 1, y;
1946                if( isNew( peer ) ) x *= 3;
1947                if( isSame( peer ) ) x *= 3;
1948                for( y=0; y<x; ++y )
1949                    tr_ptrArrayAppend( randPool, &choke[i] );
1950            }
1951        }
1952
1953        if(( n = tr_ptrArraySize( randPool )))
1954        {
1955            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
1956            c->doUnchoke = 1;
1957            t->optimistic = c->peer;
1958        }
1959
1960        tr_ptrArrayFree( randPool, NULL );
1961    }
1962
1963    for( i = 0; i < size; ++i )
1964        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1965
1966    /* cleanup */
1967    tr_free( choke );
1968    tr_free( peers );
1969}
1970
1971static int
1972rechokePulse( void * vtorrent )
1973{
1974    Torrent * t = vtorrent;
1975
1976    torrentLock( t );
1977    rechoke( t );
1978    torrentUnlock( t );
1979    return TRUE;
1980}
1981
1982/***
1983****
1984****  Life and Death
1985****
1986***/
1987
1988static int
1989shouldPeerBeClosed( const Torrent * t,
1990                    const tr_peer * peer,
1991                    int             peerCount )
1992{
1993    const tr_torrent *       tor = t->tor;
1994    const time_t             now = time( NULL );
1995    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1996
1997    /* if it's marked for purging, close it */
1998    if( peer->doPurge )
1999    {
2000        tordbg( t, "purging peer %s because its doPurge flag is set",
2001                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2002        return TRUE;
2003    }
2004
2005    /* if we're seeding and the peer has everything we have,
2006     * and enough time has passed for a pex exchange, then disconnect */
2007    if( tr_torrentIsSeed( tor ) )
2008    {
2009        int peerHasEverything;
2010        if( atom->flags & ADDED_F_SEED_FLAG )
2011            peerHasEverything = TRUE;
2012        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2013            peerHasEverything = FALSE;
2014        else {
2015            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2016            tr_bitfieldDifference( tmp, peer->have );
2017            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2018            tr_bitfieldFree( tmp );
2019        }
2020        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2021            tordbg( t, "purging peer %s because we're both seeds",
2022                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2023            return TRUE;
2024        }
2025    }
2026
2027    /* disconnect if it's been too long since piece data has been transferred.
2028     * this is on a sliding scale based on number of available peers... */
2029    {
2030        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2031        /* if we have >= relaxIfFewerThan, strictness is 100%.
2032         * if we have zero connections, strictness is 0% */
2033        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2034            ? 1.0
2035            : peerCount / (float)relaxStrictnessIfFewerThanN;
2036        const int lo = MIN_UPLOAD_IDLE_SECS;
2037        const int hi = MAX_UPLOAD_IDLE_SECS;
2038        const int limit = hi - ( ( hi - lo ) * strictness );
2039        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2040/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2041        if( idleTime > limit ) {
2042            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2043                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2044            return TRUE;
2045        }
2046    }
2047
2048    return FALSE;
2049}
2050
2051static tr_peer **
2052getPeersToClose( Torrent * t, int * setmeSize )
2053{
2054    int i, peerCount, outsize;
2055    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2056    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2057
2058    assert( torrentIsLocked( t ) );
2059
2060    for( i = outsize = 0; i < peerCount; ++i )
2061        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2062            ret[outsize++] = peers[i];
2063
2064    *setmeSize = outsize;
2065    return ret;
2066}
2067
2068static int
2069compareCandidates( const void * va,
2070                   const void * vb )
2071{
2072    const struct peer_atom * a = *(const struct peer_atom**) va;
2073    const struct peer_atom * b = *(const struct peer_atom**) vb;
2074
2075    /* <Charles> Here we would probably want to try reconnecting to
2076     * peers that had most recently given us data. Lots of users have
2077     * trouble with resets due to their routers and/or ISPs. This way we
2078     * can quickly recover from an unwanted reset. So we sort
2079     * piece_data_time in descending order.
2080     */
2081
2082    if( a->piece_data_time != b->piece_data_time )
2083        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2084
2085    if( a->numFails != b->numFails )
2086        return a->numFails < b->numFails ? -1 : 1;
2087
2088    if( a->time != b->time )
2089        return a->time < b->time ? -1 : 1;
2090
2091    /* all other things being equal, prefer peers whose
2092     * information comes from a more reliable source */
2093    if( a->from != b->from )
2094        return a->from < b->from ? -1 : 1;
2095
2096    return 0;
2097}
2098
2099static int
2100getReconnectIntervalSecs( const struct peer_atom * atom )
2101{
2102    int          sec;
2103    const time_t now = time( NULL );
2104
2105    /* if we were recently connected to this peer and transferring piece
2106     * data, try to reconnect to them sooner rather that later -- we don't
2107     * want network troubles to get in the way of a good peer. */
2108    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2109        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2110
2111    /* don't allow reconnects more often than our minimum */
2112    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2113        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2114
2115    /* otherwise, the interval depends on how many times we've tried
2116     * and failed to connect to the peer */
2117    else switch( atom->numFails ) {
2118        case 0: sec = 0; break;
2119        case 1: sec = 5; break;
2120        case 2: sec = 2 * 60; break;
2121        case 3: sec = 15 * 60; break;
2122        case 4: sec = 30 * 60; break;
2123        case 5: sec = 60 * 60; break;
2124        default: sec = 120 * 60; break;
2125    }
2126
2127    return sec;
2128}
2129
2130static struct peer_atom **
2131getPeerCandidates( Torrent * t, int * setmeSize )
2132{
2133    int                 i, atomCount, retCount;
2134    struct peer_atom ** atoms;
2135    struct peer_atom ** ret;
2136    const time_t        now = time( NULL );
2137    const int           seed = tr_torrentIsSeed( t->tor );
2138
2139    assert( torrentIsLocked( t ) );
2140
2141    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2142    ret = tr_new( struct peer_atom*, atomCount );
2143    for( i = retCount = 0; i < atomCount; ++i )
2144    {
2145        int                interval;
2146        struct peer_atom * atom = atoms[i];
2147
2148        /* peer fed us too much bad data ... we only keep it around
2149         * now to weed it out in case someone sends it to us via pex */
2150        if( atom->myflags & MYFLAG_BANNED )
2151            continue;
2152
2153        /* peer was unconnectable before, so we're not going to keep trying.
2154         * this is needs a separate flag from `banned', since if they try
2155         * to connect to us later, we'll let them in */
2156        if( atom->myflags & MYFLAG_UNREACHABLE )
2157            continue;
2158
2159        /* we don't need two connections to the same peer... */
2160        if( peerIsInUse( t, &atom->addr ) )
2161            continue;
2162
2163        /* no need to connect if we're both seeds... */
2164        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2165                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2166            continue;
2167
2168        /* don't reconnect too often */
2169        interval = getReconnectIntervalSecs( atom );
2170        if( ( now - atom->time ) < interval )
2171        {
2172            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2173                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2174            continue;
2175        }
2176
2177        /* Don't connect to peers in our blocklist */
2178        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2179            continue;
2180
2181        ret[retCount++] = atom;
2182    }
2183
2184    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2185    *setmeSize = retCount;
2186    return ret;
2187}
2188
2189static int
2190reconnectPulse( void * vtorrent )
2191{
2192    Torrent *     t = vtorrent;
2193    static time_t prevTime = 0;
2194    static int    newConnectionsThisSecond = 0;
2195    time_t        now;
2196
2197    torrentLock( t );
2198
2199    now = time( NULL );
2200    if( prevTime != now )
2201    {
2202        prevTime = now;
2203        newConnectionsThisSecond = 0;
2204    }
2205
2206    if( !t->isRunning )
2207    {
2208        removeAllPeers( t );
2209    }
2210    else
2211    {
2212        int i, nCandidates, nBad;
2213        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2214        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2215
2216        if( nBad || nCandidates )
2217            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2218                   "%d connection candidates, %d atoms, max per pulse is %d",
2219                   t->tor->info.name, nBad, nCandidates,
2220                   tr_ptrArraySize( t->pool ),
2221                   (int)MAX_RECONNECTIONS_PER_PULSE );
2222
2223        /* disconnect some peers.
2224           if we transferred piece data, then they might be good peers,
2225           so reset their `numFails' weight to zero.  otherwise we connected
2226           to them fruitlessly, so mark it as another fail */
2227        for( i = 0; i < nBad; ++i ) {
2228            tr_peer * peer = connections[i];
2229            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2230            if( atom->piece_data_time )
2231                atom->numFails = 0;
2232            else
2233                ++atom->numFails;
2234            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2235            removePeer( t, peer );
2236        }
2237
2238        /* add some new ones */
2239        for( i = 0;    ( i < nCandidates )
2240           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2241           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2242           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2243        {
2244            tr_peerMgr *       mgr = t->manager;
2245            struct peer_atom * atom = candidates[i];
2246            tr_peerIo *        io;
2247
2248            tordbg( t, "Starting an OUTGOING connection with %s",
2249                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2250
2251            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2252            if( io == NULL )
2253            {
2254                atom->myflags |= MYFLAG_UNREACHABLE;
2255            }
2256            else
2257            {
2258                tr_handshake * handshake = tr_handshakeNew( io,
2259                                                            mgr->session->encryptionMode,
2260                                                            myHandshakeDoneCB,
2261                                                            mgr );
2262
2263                assert( tr_peerIoGetTorrentHash( io ) );
2264
2265                ++newConnectionsThisSecond;
2266
2267                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2268                                         handshakeCompare );
2269            }
2270
2271            atom->time = time( NULL );
2272        }
2273
2274        /* cleanup */
2275        tr_free( connections );
2276        tr_free( candidates );
2277    }
2278
2279    torrentUnlock( t );
2280    return TRUE;
2281}
2282
2283/****
2284*****
2285*****  BANDWIDTH ALLOCATION
2286*****
2287****/
2288
2289static void
2290pumpAllPeers( tr_peerMgr * mgr )
2291{
2292    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2293    int       i, j;
2294
2295    for( i=0; i<torrentCount; ++i )
2296    {
2297        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2298        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2299        {
2300            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2301            tr_peerMsgsPulse( peer->msgs );
2302        }
2303    }
2304}
2305
2306static int
2307bandwidthPulse( void * vmgr )
2308{
2309    tr_peerMgr * mgr = vmgr;
2310    managerLock( mgr );
2311
2312    pumpAllPeers( mgr );
2313    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2314    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2315    pumpAllPeers( mgr );
2316
2317    managerUnlock( mgr );
2318    return TRUE;
2319}
Note: See TracBrowser for help on using the repository browser.