source: branches/1.4x/libtransmission/peer-mgr.c @ 7354

Last change on this file since 7354 was 7354, checked in by charles, 12 years ago

(1.4x libT) fix bug which caused libtransmission to hold onto onto nonproductive peers for longer than it should've

  • Property svn:keywords set to Date Rev Author Id
File size: 65.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7354 2008-12-11 07:11:23Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92/* We keep one of these for every peer we know about, whether
93 * it's connected or not, so the struct must be small.
94 * When our current connections underperform, we dip back
95 * into this list for new ones. */
96struct peer_atom
97{
98    uint8_t           from;
99    uint8_t           flags; /* these match the added_f flags */
100    uint8_t           myflags; /* flags that aren't defined in added_f */
101    uint16_t          port;
102    uint16_t          numFails;
103    struct in_addr    addr;
104    time_t            time; /* when the peer's connection status last changed */
105    time_t            piece_data_time;
106};
107
108typedef struct
109{
110    tr_bool         isRunning;
111
112    uint8_t         hash[SHA_DIGEST_LENGTH];
113    int         *   pendingRequestCount;
114    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
115    tr_ptrArray *   pool; /* struct peer_atom */
116    tr_ptrArray *   peers; /* tr_peer */
117    tr_ptrArray *   webseeds; /* tr_webseed */
118    tr_timer *      reconnectTimer;
119    tr_timer *      rechokeTimer;
120    tr_timer *      refillTimer;
121    tr_torrent *    tor;
122    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
123
124    struct tr_peerMgr * manager;
125}
126Torrent;
127
128struct tr_peerMgr
129{
130    tr_session      * session;
131    tr_ptrArray     * torrents; /* Torrent */
132    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
133    tr_timer        * bandwidthTimer;
134};
135
136#define tordbg( t, ... ) \
137    do { \
138        if( tr_deepLoggingIsActive( ) ) \
139            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
140    } while( 0 )
141
142#define dbgmsg( ... ) \
143    do { \
144        if( tr_deepLoggingIsActive( ) ) \
145            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
146    } while( 0 )
147
148/**
149***
150**/
151
152static void
153managerLock( const struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->session );
156}
157
158static void
159managerUnlock( const struct tr_peerMgr * manager )
160{
161    tr_globalUnlock( manager->session );
162}
163
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169
170static void
171torrentUnlock( Torrent * torrent )
172{
173    managerUnlock( torrent->manager );
174}
175
176static int
177torrentIsLocked( const Torrent * t )
178{
179    return tr_globalIsLocked( t->manager->session );
180}
181
182/**
183***
184**/
185
186static int
187compareAddresses( const struct in_addr * a,
188                  const struct in_addr * b )
189{
190    if( a->s_addr != b->s_addr )
191        return a->s_addr < b->s_addr ? -1 : 1;
192
193    return 0;
194}
195
196static int
197handshakeCompareToAddr( const void * va,
198                        const void * vb )
199{
200    const tr_handshake * a = va;
201
202    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
203}
204
205static int
206handshakeCompare( const void * a,
207                  const void * b )
208{
209    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
210}
211
212static tr_handshake*
213getExistingHandshake( tr_ptrArray *          handshakes,
214                      const struct in_addr * in_addr )
215{
216    return tr_ptrArrayFindSorted( handshakes,
217                                  in_addr,
218                                  handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va,
223                          const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va,
232                  const void * vb )
233{
234    const struct peer_atom * b = vb;
235
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va,
245                const void * vb )
246{
247    const Torrent * a = va;
248    const Torrent * b = vb;
249
250    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
251}
252
253static int
254torrentCompareToHash( const void * va,
255                      const void * vb )
256{
257    const Torrent * a = va;
258    const uint8_t * b_hash = vb;
259
260    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
261}
262
263static Torrent*
264getExistingTorrent( tr_peerMgr *    manager,
265                    const uint8_t * hash )
266{
267    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
268                                             hash,
269                                             torrentCompareToHash );
270}
271
272static int
273peerCompare( const void * va,
274             const void * vb )
275{
276    const tr_peer * a = va;
277    const tr_peer * b = vb;
278
279    return compareAddresses( &a->in_addr, &b->in_addr );
280}
281
282static int
283peerCompareToAddr( const void * va,
284                   const void * vb )
285{
286    const tr_peer * a = va;
287
288    return compareAddresses( &a->in_addr, vb );
289}
290
291static tr_peer*
292getExistingPeer( Torrent *              torrent,
293                 const struct in_addr * in_addr )
294{
295    assert( torrentIsLocked( torrent ) );
296    assert( in_addr );
297
298    return tr_ptrArrayFindSorted( torrent->peers,
299                                  in_addr,
300                                  peerCompareToAddr );
301}
302
303static struct peer_atom*
304getExistingAtom( const                  Torrent * t,
305                 const struct in_addr * addr )
306{
307    assert( torrentIsLocked( t ) );
308    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
309}
310
311static int
312peerIsInUse( const Torrent *        ct,
313             const struct in_addr * addr )
314{
315    Torrent * t = (Torrent*) ct;
316
317    assert( torrentIsLocked ( t ) );
318
319    return getExistingPeer( t, addr )
320           || getExistingHandshake( t->outgoingHandshakes, addr )
321           || getExistingHandshake( t->manager->incomingHandshakes, addr );
322}
323
324static tr_peer*
325peerConstructor( tr_torrent * tor, const struct in_addr * in_addr )
326{
327    tr_peer * p;
328
329    p = tr_new0( tr_peer, 1 );
330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
331    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
332    return p;
333}
334
335static tr_peer*
336getPeer( Torrent *              torrent,
337         const struct in_addr * in_addr )
338{
339    tr_peer * peer;
340
341    assert( torrentIsLocked( torrent ) );
342
343    peer = getExistingPeer( torrent, in_addr );
344
345    if( peer == NULL )
346    {
347        peer = peerConstructor( torrent->tor, in_addr );
348        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
349    }
350
351    return peer;
352}
353
354static void
355peerDestructor( tr_peer * peer )
356{
357    assert( peer );
358    assert( peer->msgs );
359
360    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
361    tr_peerMsgsFree( peer->msgs );
362
363    tr_peerIoFree( peer->io );
364
365    tr_bitfieldFree( peer->have );
366    tr_bitfieldFree( peer->blame );
367    tr_free( peer->client );
368
369    tr_bandwidthFree( peer->bandwidth );
370
371    tr_free( peer );
372}
373
374static void
375removePeer( Torrent * t,
376            tr_peer * peer )
377{
378    tr_peer *          removed;
379    struct peer_atom * atom;
380
381    assert( torrentIsLocked( t ) );
382
383    atom = getExistingAtom( t, &peer->in_addr );
384    assert( atom );
385    atom->time = time( NULL );
386
387    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
388    assert( removed == peer );
389    peerDestructor( removed );
390}
391
392static void
393removeAllPeers( Torrent * t )
394{
395    while( !tr_ptrArrayEmpty( t->peers ) )
396        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
397}
398
399static void
400torrentDestructor( void * vt )
401{
402    Torrent * t = vt;
403    uint8_t   hash[SHA_DIGEST_LENGTH];
404
405    assert( t );
406    assert( !t->isRunning );
407    assert( t->peers );
408    assert( torrentIsLocked( t ) );
409    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
410    assert( tr_ptrArrayEmpty( t->peers ) );
411
412    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
413
414    tr_timerFree( &t->reconnectTimer );
415    tr_timerFree( &t->rechokeTimer );
416    tr_timerFree( &t->refillTimer );
417
418    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
419    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
420    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
421    tr_ptrArrayFree( t->peers, NULL );
422
423    tr_free( t->pendingRequestCount );
424    tr_free( t );
425}
426
427static void peerCallbackFunc( void * vpeer,
428                              void * vevent,
429                              void * vt );
430
431static Torrent*
432torrentConstructor( tr_peerMgr * manager,
433                    tr_torrent * tor )
434{
435    int       i;
436    Torrent * t;
437
438    t = tr_new0( Torrent, 1 );
439    t->manager = manager;
440    t->tor = tor;
441    t->pool = tr_ptrArrayNew( );
442    t->peers = tr_ptrArrayNew( );
443    t->webseeds = tr_ptrArrayNew( );
444    t->outgoingHandshakes = tr_ptrArrayNew( );
445    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
446
447    for( i = 0; i < tor->info.webseedCount; ++i )
448    {
449        tr_webseed * w =
450            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
451                           t );
452        tr_ptrArrayAppend( t->webseeds, w );
453    }
454
455    return t;
456}
457
458/**
459 * For explanation, see http://www.bittorrent.org/fast_extensions.html
460 * Also see the "test-allowed-set" unit test
461 *
462 * @param k number of pieces in set
463 * @param sz number of pieces in the torrent
464 * @param infohash torrent's SHA1 hash
465 * @param ip peer's address
466 */
467struct tr_bitfield *
468tr_peerMgrGenerateAllowedSet(
469    const uint32_t         k,
470    const uint32_t         sz,
471    const                  uint8_t        *
472                           infohash,
473    const struct in_addr * ip )
474{
475    uint8_t       w[SHA_DIGEST_LENGTH + 4];
476    uint8_t       x[SHA_DIGEST_LENGTH];
477    tr_bitfield * a;
478    uint32_t      a_size;
479
480    *(uint32_t*)w = ntohl( htonl( ip->s_addr ) & 0xffffff00 );   /* (1) */
481    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
482    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
483
484    a = tr_bitfieldNew( sz );
485    a_size = 0;
486
487    while( a_size < k )
488    {
489        int i;
490        for( i = 0; i < 5 && a_size < k; ++i )                      /* (4) */
491        {
492            uint32_t j = i * 4;                                /* (5) */
493            uint32_t y = ntohl( *( uint32_t* )( x + j ) );             /* (6) */
494            uint32_t index = y % sz;                           /* (7) */
495            if( !tr_bitfieldHas( a, index ) )                  /* (8) */
496            {
497                tr_bitfieldAdd( a, index );                    /* (9) */
498                ++a_size;
499            }
500        }
501        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
502    }
503
504    return a;
505}
506
507static int bandwidthPulse( void * vmgr );
508
509
510tr_peerMgr*
511tr_peerMgrNew( tr_session * session )
512{
513    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
514
515    m->session = session;
516    m->torrents = tr_ptrArrayNew( );
517    m->incomingHandshakes = tr_ptrArrayNew( );
518    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
519    return m;
520}
521
522void
523tr_peerMgrFree( tr_peerMgr * manager )
524{
525    managerLock( manager );
526
527    tr_timerFree( &manager->bandwidthTimer );
528
529    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
530     * the item from manager->handshakes, so this is a little roundabout... */
531    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
532        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
533
534    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
535
536    /* free the torrents. */
537    tr_ptrArrayFree( manager->torrents, torrentDestructor );
538
539    managerUnlock( manager );
540    tr_free( manager );
541}
542
543static tr_peer**
544getConnectedPeers( Torrent * t,
545                   int *     setmeCount )
546{
547    int       i, peerCount, connectionCount;
548    tr_peer **peers;
549    tr_peer **ret;
550
551    assert( torrentIsLocked( t ) );
552
553    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
554    ret = tr_new( tr_peer *, peerCount );
555
556    for( i = connectionCount = 0; i < peerCount; ++i )
557        if( peers[i]->msgs )
558            ret[connectionCount++] = peers[i];
559
560    *setmeCount = connectionCount;
561    return ret;
562}
563
564static int
565clientIsDownloadingFrom( const tr_peer * peer )
566{
567    return peer->clientIsInterested && !peer->clientIsChoked;
568}
569
570static int
571clientIsUploadingTo( const tr_peer * peer )
572{
573    return peer->peerIsInterested && !peer->peerIsChoked;
574}
575
576/***
577****
578***/
579
580int
581tr_peerMgrPeerIsSeed( const tr_peerMgr *     mgr,
582                      const uint8_t *        torrentHash,
583                      const struct in_addr * addr )
584{
585    int                      isSeed = FALSE;
586    const Torrent *          t = NULL;
587    const struct peer_atom * atom = NULL;
588
589    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
590    if( t )
591        atom = getExistingAtom( t, addr );
592    if( atom )
593        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
594
595    return isSeed;
596}
597
598/****
599*****
600*****  REFILL
601*****
602****/
603
604static void
605assertValidPiece( Torrent * t, tr_piece_index_t piece )
606{
607    assert( t );
608    assert( t->tor );
609    assert( piece < t->tor->info.pieceCount );
610}
611
612static int
613getPieceRequests( Torrent * t, tr_piece_index_t piece )
614{
615    assertValidPiece( t, piece );
616
617    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
618}
619
620static void
621incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
622{
623    assertValidPiece( t, piece );
624
625    if( t->pendingRequestCount == NULL )
626        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
627    t->pendingRequestCount[piece]++;
628}
629
630static void
631decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
632{
633    assertValidPiece( t, piece );
634
635    if( t->pendingRequestCount )
636        t->pendingRequestCount[piece]--;
637}
638
639struct tr_refill_piece
640{
641    tr_priority_t    priority;
642    uint32_t         piece;
643    uint32_t         peerCount;
644    int              random;
645    int              pendingRequestCount;
646    int              missingBlockCount;
647};
648
649static int
650compareRefillPiece( const void * aIn,
651                    const void * bIn )
652{
653    const struct tr_refill_piece * a = aIn;
654    const struct tr_refill_piece * b = bIn;
655
656    /* if one piece has a higher priority, it goes first */
657    if( a->priority != b->priority )
658        return a->priority > b->priority ? -1 : 1;
659
660    /* have a per-priority endgame */
661    if( a->pendingRequestCount != b->pendingRequestCount )
662        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
663
664    /* fewer missing pieces goes first */
665    if( a->missingBlockCount != b->missingBlockCount )
666        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
667
668    /* otherwise if one has fewer peers, it goes first */
669    if( a->peerCount != b->peerCount )
670        return a->peerCount < b->peerCount ? -1 : 1;
671
672    /* otherwise go with our random seed */
673    if( a->random != b->random )
674        return a->random < b->random ? -1 : 1;
675
676    return 0;
677}
678
679static tr_piece_index_t *
680getPreferredPieces( Torrent           * t,
681                    tr_piece_index_t  * pieceCount )
682{
683    const tr_torrent  * tor = t->tor;
684    const tr_info     * inf = &tor->info;
685    tr_piece_index_t    i;
686    tr_piece_index_t    poolSize = 0;
687    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
688    int                 peerCount;
689    tr_peer**           peers;
690
691    assert( torrentIsLocked( t ) );
692
693    peers = getConnectedPeers( t, &peerCount );
694
695    /* make a list of the pieces that we want but don't have */
696    for( i = 0; i < inf->pieceCount; ++i )
697        if( !tor->info.pieces[i].dnd
698                && !tr_cpPieceIsComplete( tor->completion, i ) )
699            pool[poolSize++] = i;
700
701    /* sort the pool by which to request next */
702    if( poolSize > 1 )
703    {
704        tr_piece_index_t j;
705        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
706
707        for( j = 0; j < poolSize; ++j )
708        {
709            int k;
710            const tr_piece_index_t piece = pool[j];
711            struct tr_refill_piece * setme = p + j;
712
713            setme->piece = piece;
714            setme->priority = inf->pieces[piece].priority;
715            setme->peerCount = 0;
716            setme->random = tr_cryptoWeakRandInt( INT_MAX );
717            setme->pendingRequestCount = getPieceRequests( t, piece );
718            setme->missingBlockCount
719                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
720
721            for( k = 0; k < peerCount; ++k )
722            {
723                const tr_peer * peer = peers[k];
724                if( peer->peerIsInterested
725                        && !peer->clientIsChoked
726                        && tr_bitfieldHas( peer->have, piece ) )
727                    ++setme->peerCount;
728            }
729        }
730
731        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
732               compareRefillPiece );
733
734        for( j = 0; j < poolSize; ++j )
735            pool[j] = p[j].piece;
736
737        tr_free( p );
738    }
739
740    tr_free( peers );
741
742    *pieceCount = poolSize;
743    return pool;
744}
745
746struct tr_blockIterator
747{
748    Torrent * t;
749    tr_block_index_t blockIndex, blockCount, *blocks;
750    tr_piece_index_t pieceIndex, pieceCount, *pieces;
751};
752
753static struct tr_blockIterator*
754blockIteratorNew( Torrent * t )
755{
756    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
757    i->t = t;
758    i->pieces = getPreferredPieces( t, &i->pieceCount );
759    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
760    return i;
761}
762
763static int
764blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
765{
766    int found;
767    Torrent * t = i->t;
768    tr_torrent * tor = t->tor;
769
770    while( ( i->blockIndex == i->blockCount )
771        && ( i->pieceIndex < i->pieceCount ) )
772    {
773        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
774        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
775        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
776        tr_block_index_t block;
777
778        assert( index < tor->info.pieceCount );
779
780        i->blockCount = 0;
781        i->blockIndex = 0;
782        for( block=b; block!=e; ++block )
783            if( !tr_cpBlockIsComplete( tor->completion, block ) )
784                i->blocks[i->blockCount++] = block;
785    }
786
787    if(( found = ( i->blockIndex < i->blockCount )))
788        *setme = i->blocks[i->blockIndex++];
789
790    return found;
791}
792
793static void
794blockIteratorFree( struct tr_blockIterator * i )
795{
796    tr_free( i->blocks );
797    tr_free( i->pieces );
798    tr_free( i );
799}
800
801static tr_peer**
802getPeersUploadingToClient( Torrent * t,
803                           int *     setmeCount )
804{
805    int j;
806    int peerCount = 0;
807    int retCount = 0;
808    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
809    tr_peer ** ret = tr_new( tr_peer *, peerCount );
810
811    j = 0; /* this is a temporary test to make sure we walk through all the peers */
812    if( peerCount )
813    {
814        /* Get a list of peers we're downloading from.
815           Pick a different starting point each time so all peers
816           get a chance at being the first in line */
817        const int fencepost = tr_cryptoWeakRandInt( peerCount );
818        int i = fencepost;
819        do {
820            if( clientIsDownloadingFrom( peers[i] ) )
821                ret[retCount++] = peers[i];
822            i = ( i + 1 ) % peerCount;
823            ++j;
824        } while( i != fencepost );
825    }
826    assert( j == peerCount );
827    *setmeCount = retCount;
828    return ret;
829}
830
831static uint32_t
832getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
833{
834    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
835    const uint64_t blockPos = tor->blockSize * b;
836    assert( blockPos >= piecePos );
837    return (uint32_t)( blockPos - piecePos );
838}
839
840static int
841refillPulse( void * vtorrent )
842{
843    tr_block_index_t block;
844    int peerCount;
845    int webseedCount;
846    tr_peer ** peers;
847    tr_webseed ** webseeds;
848    struct tr_blockIterator * blockIterator;
849    Torrent * t = vtorrent;
850    tr_torrent * tor = t->tor;
851
852    if( !t->isRunning )
853        return TRUE;
854    if( tr_torrentIsSeed( t->tor ) )
855        return TRUE;
856
857    torrentLock( t );
858    tordbg( t, "Refilling Request Buffers..." );
859
860    blockIterator = blockIteratorNew( t );
861    peers = getPeersUploadingToClient( t, &peerCount );
862    webseedCount = tr_ptrArraySize( t->webseeds );
863    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
864                          webseedCount * sizeof( tr_webseed* ) );
865
866    while( ( webseedCount || peerCount )
867        && blockIteratorNext( blockIterator, &block ) )
868    {
869        int j;
870        int handled = FALSE;
871
872        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
873        const uint32_t offset = getBlockOffsetInPiece( tor, block );
874        const uint32_t length = tr_torBlockCountBytes( tor, block );
875
876        /* find a peer who can ask for this block */
877        for( j=0; !handled && j<peerCount; )
878        {
879            const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
880                                                   index, offset, length );
881            switch( val )
882            {
883                case TR_ADDREQ_FULL:
884                case TR_ADDREQ_CLIENT_CHOKED:
885                    peers[j] = peers[--peerCount];
886                    break;
887
888                case TR_ADDREQ_MISSING:
889                case TR_ADDREQ_DUPLICATE:
890                    ++j;
891                    break;
892
893                case TR_ADDREQ_OK:
894                    incrementPieceRequests( t, index );
895                    handled = TRUE;
896                    break;
897
898                default:
899                    assert( 0 && "unhandled value" );
900                    break;
901            }
902        }
903
904        /* maybe one of the webseeds can do it */
905        for( j=0; !handled && j<webseedCount; )
906        {
907            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
908                                                          index, offset, length );
909            switch( val )
910            {
911                case TR_ADDREQ_FULL:
912                    webseeds[j] = webseeds[--webseedCount];
913                    break;
914
915                case TR_ADDREQ_OK:
916                    incrementPieceRequests( t, index );
917                    handled = TRUE;
918                    break;
919
920                default:
921                    assert( 0 && "unhandled value" );
922                    break;
923            }
924        }
925    }
926
927    /* cleanup */
928    blockIteratorFree( blockIterator );
929    tr_free( webseeds );
930    tr_free( peers );
931
932    t->refillTimer = NULL;
933    torrentUnlock( t );
934    return FALSE;
935}
936
937static void
938broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
939{
940    int i, size;
941    tr_peer ** peers;
942
943    assert( torrentIsLocked( t ) );
944
945    peers = getConnectedPeers( t, &size );
946    for( i=0; i<size; ++i )
947        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
948    tr_free( peers );
949}
950
951static void
952addStrike( Torrent * t,
953           tr_peer * peer )
954{
955    tordbg( t, "increasing peer %s strike count to %d",
956            tr_peerIoAddrStr( &peer->in_addr,
957                              peer->port ), peer->strikes + 1 );
958
959    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
960    {
961        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
962        atom->myflags |= MYFLAG_BANNED;
963        peer->doPurge = 1;
964        tordbg( t, "banning peer %s",
965               tr_peerIoAddrStr( &atom->addr, atom->port ) );
966    }
967}
968
969static void
970gotBadPiece( Torrent *        t,
971             tr_piece_index_t pieceIndex )
972{
973    tr_torrent *   tor = t->tor;
974    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
975
976    tor->corruptCur += byteCount;
977    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
978}
979
980static void
981refillSoon( Torrent * t )
982{
983    if( t->refillTimer == NULL )
984        t->refillTimer = tr_timerNew( t->manager->session,
985                                      refillPulse, t,
986                                      REFILL_PERIOD_MSEC );
987}
988
989static void
990peerCallbackFunc( void * vpeer,
991                  void * vevent,
992                  void * vt )
993{
994    tr_peer *             peer = vpeer; /* may be NULL if peer is a webseed */
995    Torrent *             t = (Torrent *) vt;
996    const tr_peer_event * e = vevent;
997
998    torrentLock( t );
999
1000    switch( e->eventType )
1001    {
1002        case TR_PEER_NEED_REQ:
1003            refillSoon( t );
1004            break;
1005
1006        case TR_PEER_CANCEL:
1007            decrementPieceRequests( t, e->pieceIndex );
1008            break;
1009
1010        case TR_PEER_PEER_GOT_DATA:
1011        {
1012            const time_t now = time( NULL );
1013            tr_torrent * tor = t->tor;
1014
1015            tor->activityDate = now;
1016
1017            if( e->wasPieceData )
1018                tor->uploadedCur += e->length;
1019
1020            /* update the stats */
1021            if( e->wasPieceData )
1022                tr_statsAddUploaded( tor->session, e->length );
1023
1024            /* update our atom */
1025            if( peer ) {
1026                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1027                if( e->wasPieceData )
1028                    a->piece_data_time = now;
1029            }
1030
1031            break;
1032        }
1033
1034        case TR_PEER_CLIENT_GOT_DATA:
1035        {
1036            const time_t now = time( NULL );
1037            tr_torrent * tor = t->tor;
1038
1039            tor->activityDate = now;
1040
1041            /* only add this to downloadedCur if we got it from a peer --
1042             * webseeds shouldn't count against our ratio.  As one tracker
1043             * admin put it, "Those pieces are downloaded directly from the
1044             * content distributor, not the peers, it is the tracker's job
1045             * to manage the swarms, not the web server and does not fit
1046             * into the jurisdiction of the tracker." */
1047            if( peer && e->wasPieceData )
1048                tor->downloadedCur += e->length;
1049
1050            /* update the stats */ 
1051            if( e->wasPieceData )
1052                tr_statsAddDownloaded( tor->session, e->length );
1053
1054            /* update our atom */
1055            if( peer ) {
1056                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
1057                if( e->wasPieceData )
1058                    a->piece_data_time = now;
1059            }
1060
1061            break;
1062        }
1063
1064        case TR_PEER_PEER_PROGRESS:
1065        {
1066            if( peer )
1067            {
1068                struct peer_atom * atom = getExistingAtom( t,
1069                                                           &peer->in_addr );
1070                const int          peerIsSeed = e->progress >= 1.0;
1071                if( peerIsSeed )
1072                {
1073                    tordbg( t, "marking peer %s as a seed",
1074                           tr_peerIoAddrStr( &atom->addr,
1075                                             atom->port ) );
1076                    atom->flags |= ADDED_F_SEED_FLAG;
1077                }
1078                else
1079                {
1080                    tordbg( t, "marking peer %s as a non-seed",
1081                           tr_peerIoAddrStr( &atom->addr,
1082                                             atom->port ) );
1083                    atom->flags &= ~ADDED_F_SEED_FLAG;
1084                }
1085            }
1086            break;
1087        }
1088
1089        case TR_PEER_CLIENT_GOT_BLOCK:
1090        {
1091            tr_torrent *     tor = t->tor;
1092
1093            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1094                                                e->offset );
1095
1096            tr_cpBlockAdd( tor->completion, block );
1097            decrementPieceRequests( t, e->pieceIndex );
1098
1099            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1100
1101            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1102            {
1103                const tr_piece_index_t p = e->pieceIndex;
1104                const int              ok = tr_ioTestPiece( tor, p );
1105
1106                if( !ok )
1107                {
1108                    tr_torerr( tor,
1109                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1110                              (unsigned long)p );
1111                }
1112
1113                tr_torrentSetHasPiece( tor, p, ok );
1114                tr_torrentSetPieceChecked( tor, p, TRUE );
1115                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1116
1117                if( !ok )
1118                    gotBadPiece( t, p );
1119                else
1120                {
1121                    int        i, peerCount;
1122                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1123                    for( i = 0; i < peerCount; ++i )
1124                        tr_peerMsgsHave( peers[i]->msgs, p );
1125                    tr_free( peers );
1126                }
1127
1128                tr_torrentRecheckCompleteness( tor );
1129            }
1130            break;
1131        }
1132
1133        case TR_PEER_ERROR:
1134            if( e->err == EINVAL )
1135            {
1136                addStrike( t, peer );
1137                peer->doPurge = 1;
1138            }
1139            else if( ( e->err == ERANGE )
1140                  || ( e->err == EMSGSIZE )
1141                  || ( e->err == ENOTCONN ) )
1142            {
1143                /* some protocol error from the peer */
1144                peer->doPurge = 1;
1145            }
1146            else /* a local error, such as an IO error */
1147            {
1148                t->tor->error = e->err;
1149                tr_strlcpy( t->tor->errorString,
1150                            tr_strerror( t->tor->error ),
1151                            sizeof( t->tor->errorString ) );
1152                tr_torrentStop( t->tor );
1153            }
1154            break;
1155
1156        default:
1157            assert( 0 );
1158    }
1159
1160    torrentUnlock( t );
1161}
1162
1163static void
1164ensureAtomExists( Torrent *              t,
1165                  const struct in_addr * addr,
1166                  uint16_t               port,
1167                  uint8_t                flags,
1168                  uint8_t                from )
1169{
1170    if( getExistingAtom( t, addr ) == NULL )
1171    {
1172        struct peer_atom * a;
1173        a = tr_new0( struct peer_atom, 1 );
1174        a->addr = *addr;
1175        a->port = port;
1176        a->flags = flags;
1177        a->from = from;
1178        tordbg( t, "got a new atom: %s",
1179               tr_peerIoAddrStr( &a->addr, a->port ) );
1180        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1181    }
1182}
1183
1184static int
1185getMaxPeerCount( const tr_torrent * tor )
1186{
1187    return tor->maxConnectedPeers;
1188}
1189
1190static int
1191getPeerCount( const Torrent * t )
1192{
1193    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1194               t->outgoingHandshakes );
1195}
1196
1197/* FIXME: this is kind of a mess. */
1198static int
1199myHandshakeDoneCB( tr_handshake *  handshake,
1200                   tr_peerIo *     io,
1201                   int             isConnected,
1202                   const uint8_t * peer_id,
1203                   void *          vmanager )
1204{
1205    int                    ok = isConnected;
1206    int                    success = FALSE;
1207    uint16_t               port;
1208    const struct in_addr * addr;
1209    tr_peerMgr *           manager = (tr_peerMgr*) vmanager;
1210    Torrent *              t;
1211    tr_handshake *         ours;
1212
1213    assert( io );
1214    assert( isConnected == 0 || isConnected == 1 );
1215
1216    t = tr_peerIoHasTorrentHash( io )
1217        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1218        : NULL;
1219
1220    if( tr_peerIoIsIncoming ( io ) )
1221        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1222                                        handshake, handshakeCompare );
1223    else if( t )
1224        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1225                                        handshake, handshakeCompare );
1226    else
1227        ours = handshake;
1228
1229    assert( ours );
1230    assert( ours == handshake );
1231
1232    if( t )
1233        torrentLock( t );
1234
1235    addr = tr_peerIoGetAddress( io, &port );
1236
1237    if( !ok || !t || !t->isRunning )
1238    {
1239        if( t )
1240        {
1241            struct peer_atom * atom = getExistingAtom( t, addr );
1242            if( atom )
1243                ++atom->numFails;
1244        }
1245
1246        tr_peerIoFree( io );
1247    }
1248    else /* looking good */
1249    {
1250        struct peer_atom * atom;
1251        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1252        atom = getExistingAtom( t, addr );
1253        atom->time = time( NULL );
1254        atom->piece_data_time = 0;
1255
1256        if( atom->myflags & MYFLAG_BANNED )
1257        {
1258            tordbg( t, "banned peer %s tried to reconnect",
1259                   tr_peerIoAddrStr( &atom->addr,
1260                                     atom->port ) );
1261            tr_peerIoFree( io );
1262        }
1263        else if( tr_peerIoIsIncoming( io )
1264               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1265
1266        {
1267            tr_peerIoFree( io );
1268        }
1269        else
1270        {
1271            tr_peer * peer = getExistingPeer( t, addr );
1272
1273            if( peer ) /* we already have this peer */
1274            {
1275                tr_peerIoFree( io );
1276            }
1277            else
1278            {
1279                peer = getPeer( t, addr );
1280                tr_free( peer->client );
1281
1282                if( !peer_id )
1283                    peer->client = NULL;
1284                else {
1285                    char client[128];
1286                    tr_clientForId( client, sizeof( client ), peer_id );
1287                    peer->client = tr_strdup( client );
1288                }
1289
1290                peer->port = port;
1291                peer->io = io;
1292                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1293                tr_peerIoSetBandwidth( io, peer->bandwidth );
1294
1295                success = TRUE;
1296            }
1297        }
1298    }
1299
1300    if( t )
1301        torrentUnlock( t );
1302
1303    return success;
1304}
1305
1306void
1307tr_peerMgrAddIncoming( tr_peerMgr *     manager,
1308                       struct in_addr * addr,
1309                       uint16_t         port,
1310                       int              socket )
1311{
1312    managerLock( manager );
1313
1314    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1315    {
1316        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1317               inet_ntoa( *addr ) );
1318        tr_netClose( socket );
1319    }
1320    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1321    {
1322        tr_netClose( socket );
1323    }
1324    else /* we don't have a connetion to them yet... */
1325    {
1326        tr_peerIo *    io;
1327        tr_handshake * handshake;
1328
1329        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1330
1331        handshake = tr_handshakeNew( io,
1332                                     manager->session->encryptionMode,
1333                                     myHandshakeDoneCB,
1334                                     manager );
1335
1336        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1337                                 handshakeCompare );
1338    }
1339
1340    managerUnlock( manager );
1341}
1342
1343void
1344tr_peerMgrAddPex( tr_peerMgr *    manager,
1345                  const uint8_t * torrentHash,
1346                  uint8_t         from,
1347                  const tr_pex *  pex )
1348{
1349    Torrent * t;
1350
1351    managerLock( manager );
1352
1353    t = getExistingTorrent( manager, torrentHash );
1354    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->in_addr ) )
1355        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1356
1357    managerUnlock( manager );
1358}
1359
1360tr_pex *
1361tr_peerMgrCompactToPex( const void *    compact,
1362                        size_t          compactLen,
1363                        const uint8_t * added_f,
1364                        size_t          added_f_len,
1365                        size_t *        pexCount )
1366{
1367    size_t          i;
1368    size_t          n = compactLen / 6;
1369    const uint8_t * walk = compact;
1370    tr_pex *        pex = tr_new0( tr_pex, n );
1371
1372    for( i = 0; i < n; ++i )
1373    {
1374        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1375        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1376        if( added_f && ( n == added_f_len ) )
1377            pex[i].flags = added_f[i];
1378    }
1379
1380    *pexCount = n;
1381    return pex;
1382}
1383
1384/**
1385***
1386**/
1387
1388void
1389tr_peerMgrSetBlame( tr_peerMgr *     manager,
1390                    const uint8_t *  torrentHash,
1391                    tr_piece_index_t pieceIndex,
1392                    int              success )
1393{
1394    if( !success )
1395    {
1396        int        peerCount, i;
1397        Torrent *  t = getExistingTorrent( manager, torrentHash );
1398        tr_peer ** peers;
1399
1400        assert( torrentIsLocked( t ) );
1401
1402        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1403        for( i = 0; i < peerCount; ++i )
1404        {
1405            tr_peer * peer = peers[i];
1406            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1407            {
1408                tordbg(
1409                    t,
1410                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1411                    tr_peerIoAddrStr( &peer->in_addr, peer->port ),
1412                    pieceIndex, (int)peer->strikes + 1 );
1413                addStrike( t, peer );
1414            }
1415        }
1416    }
1417}
1418
1419int
1420tr_pexCompare( const void * va,
1421               const void * vb )
1422{
1423    const tr_pex * a = va;
1424    const tr_pex * b = vb;
1425    int            i =
1426        memcmp( &a->in_addr, &b->in_addr, sizeof( struct in_addr ) );
1427
1428    if( i ) return i;
1429    if( a->port < b->port ) return -1;
1430    if( a->port > b->port ) return 1;
1431    return 0;
1432}
1433
1434int tr_pexCompare( const void * a,
1435                   const void * b );
1436
1437static int
1438peerPrefersCrypto( const tr_peer * peer )
1439{
1440    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1441        return TRUE;
1442
1443    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1444        return FALSE;
1445
1446    return tr_peerIoIsEncrypted( peer->io );
1447}
1448
1449int
1450tr_peerMgrGetPeers( tr_peerMgr *    manager,
1451                    const uint8_t * torrentHash,
1452                    tr_pex **       setme_pex )
1453{
1454    int peerCount = 0;
1455    const Torrent *  t;
1456
1457    managerLock( manager );
1458
1459    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1460    if( !t )
1461    {
1462        *setme_pex = NULL;
1463    }
1464    else
1465    {
1466        int i;
1467        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1468        tr_pex * pex = tr_new( tr_pex, peerCount );
1469        tr_pex * walk = pex;
1470
1471        for( i = 0; i < peerCount; ++i, ++walk )
1472        {
1473            const tr_peer * peer = peers[i];
1474            walk->in_addr = peer->in_addr;
1475            walk->port = peer->port;
1476            walk->flags = 0;
1477            if( peerPrefersCrypto( peer ) ) walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1478            if( peer->progress >= 1.0 ) walk->flags |= ADDED_F_SEED_FLAG;
1479        }
1480
1481        assert( ( walk - pex ) == peerCount );
1482        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1483        *setme_pex = pex;
1484    }
1485
1486    managerUnlock( manager );
1487    return peerCount;
1488}
1489
1490static int reconnectPulse( void * vtorrent );
1491
1492static int rechokePulse( void * vtorrent );
1493
1494void
1495tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1496                        const uint8_t * torrentHash )
1497{
1498    Torrent * t;
1499
1500    managerLock( manager );
1501
1502    t = getExistingTorrent( manager, torrentHash );
1503
1504    assert( t );
1505    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1506    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1507
1508    if( !t->isRunning )
1509    {
1510        t->isRunning = 1;
1511
1512        t->reconnectTimer = tr_timerNew( t->manager->session,
1513                                         reconnectPulse, t,
1514                                         RECONNECT_PERIOD_MSEC );
1515
1516        t->rechokeTimer = tr_timerNew( t->manager->session,
1517                                       rechokePulse, t,
1518                                       RECHOKE_PERIOD_MSEC );
1519
1520        reconnectPulse( t );
1521
1522        rechokePulse( t );
1523
1524        if( !tr_ptrArrayEmpty( t->webseeds ) )
1525            refillSoon( t );
1526    }
1527
1528    managerUnlock( manager );
1529}
1530
1531static void
1532stopTorrent( Torrent * t )
1533{
1534    assert( torrentIsLocked( t ) );
1535
1536    t->isRunning = 0;
1537    tr_timerFree( &t->rechokeTimer );
1538    tr_timerFree( &t->reconnectTimer );
1539
1540    /* disconnect the peers. */
1541    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1542    tr_ptrArrayClear( t->peers );
1543
1544    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1545     * which removes the handshake from t->outgoingHandshakes... */
1546    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1547        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1548}
1549
1550void
1551tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1552                       const uint8_t * torrentHash )
1553{
1554    managerLock( manager );
1555
1556    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1557
1558    managerUnlock( manager );
1559}
1560
1561void
1562tr_peerMgrAddTorrent( tr_peerMgr * manager,
1563                      tr_torrent * tor )
1564{
1565    Torrent * t;
1566
1567    managerLock( manager );
1568
1569    assert( tor );
1570    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1571
1572    t = torrentConstructor( manager, tor );
1573    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1574
1575    managerUnlock( manager );
1576}
1577
1578void
1579tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1580                         const uint8_t * torrentHash )
1581{
1582    Torrent * t;
1583
1584    managerLock( manager );
1585
1586    t = getExistingTorrent( manager, torrentHash );
1587    assert( t );
1588    stopTorrent( t );
1589    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1590    torrentDestructor( t );
1591
1592    managerUnlock( manager );
1593}
1594
1595void
1596tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1597                               const uint8_t *    torrentHash,
1598                               int8_t *           tab,
1599                               unsigned int       tabCount )
1600{
1601    tr_piece_index_t   i;
1602    const Torrent *    t;
1603    const tr_torrent * tor;
1604    float              interval;
1605    int                isComplete;
1606    int                peerCount;
1607    const tr_peer **   peers;
1608
1609    managerLock( manager );
1610
1611    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1612    tor = t->tor;
1613    interval = tor->info.pieceCount / (float)tabCount;
1614    isComplete = tor
1615                 && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1616    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1617
1618    memset( tab, 0, tabCount );
1619
1620    for( i = 0; tor && i < tabCount; ++i )
1621    {
1622        const int piece = i * interval;
1623
1624        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1625            tab[i] = -1;
1626        else if( peerCount )
1627        {
1628            int j;
1629            for( j = 0; j < peerCount; ++j )
1630                if( tr_bitfieldHas( peers[j]->have, i ) )
1631                    ++tab[i];
1632        }
1633    }
1634
1635    managerUnlock( manager );
1636}
1637
1638/* Returns the pieces that are available from peers */
1639tr_bitfield*
1640tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1641                        const uint8_t *    torrentHash )
1642{
1643    int           i, size;
1644    Torrent *     t;
1645    tr_peer **    peers;
1646    tr_bitfield * pieces;
1647
1648    managerLock( manager );
1649
1650    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1651    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1652    peers = getConnectedPeers( t, &size );
1653    for( i = 0; i < size; ++i )
1654        tr_bitfieldOr( pieces, peers[i]->have );
1655
1656    managerUnlock( manager );
1657    tr_free( peers );
1658    return pieces;
1659}
1660
1661int
1662tr_peerMgrHasConnections( const tr_peerMgr * manager,
1663                          const uint8_t *    torrentHash )
1664{
1665    int             ret;
1666    const Torrent * t;
1667
1668    managerLock( manager );
1669
1670    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1671    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1672               || !tr_ptrArrayEmpty( t->webseeds ) );
1673
1674    managerUnlock( manager );
1675    return ret;
1676}
1677
1678void
1679tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1680                        const uint8_t *    torrentHash,
1681                        int *              setmePeersKnown,
1682                        int *              setmePeersConnected,
1683                        int *              setmeSeedsConnected,
1684                        int *              setmeWebseedsSendingToUs,
1685                        int *              setmePeersSendingToUs,
1686                        int *              setmePeersGettingFromUs,
1687                        int *              setmePeersFrom )
1688{
1689    int                 i, size;
1690    const Torrent *     t;
1691    const tr_peer **    peers;
1692    const tr_webseed ** webseeds;
1693
1694    managerLock( manager );
1695
1696    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1697    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1698
1699    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1700    *setmePeersConnected       = 0;
1701    *setmeSeedsConnected       = 0;
1702    *setmePeersGettingFromUs   = 0;
1703    *setmePeersSendingToUs     = 0;
1704    *setmeWebseedsSendingToUs  = 0;
1705
1706    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1707        setmePeersFrom[i] = 0;
1708
1709    for( i = 0; i < size; ++i )
1710    {
1711        const tr_peer *          peer = peers[i];
1712        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1713
1714        if( peer->io == NULL ) /* not connected */
1715            continue;
1716
1717        ++ * setmePeersConnected;
1718
1719        ++setmePeersFrom[atom->from];
1720
1721        if( clientIsDownloadingFrom( peer ) )
1722            ++ * setmePeersSendingToUs;
1723
1724        if( clientIsUploadingTo( peer ) )
1725            ++ * setmePeersGettingFromUs;
1726
1727        if( atom->flags & ADDED_F_SEED_FLAG )
1728            ++ * setmeSeedsConnected;
1729    }
1730
1731    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1732    for( i = 0; i < size; ++i )
1733    {
1734        if( tr_webseedIsActive( webseeds[i] ) )
1735            ++ * setmeWebseedsSendingToUs;
1736    }
1737
1738    managerUnlock( manager );
1739}
1740
1741float*
1742tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1743                     const uint8_t *    torrentHash )
1744{
1745    const Torrent *     t;
1746    const tr_webseed ** webseeds;
1747    int                 i;
1748    int                 webseedCount;
1749    float *             ret;
1750
1751    assert( manager );
1752    managerLock( manager );
1753
1754    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1755    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1756                                                     &webseedCount );
1757    assert( webseedCount == t->tor->info.webseedCount );
1758    ret = tr_new0( float, webseedCount );
1759
1760    for( i = 0; i < webseedCount; ++i )
1761        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1762            ret[i] = -1.0;
1763
1764    managerUnlock( manager );
1765    return ret;
1766}
1767
1768double
1769tr_peerGetPieceSpeed( const tr_peer    * peer,
1770                      tr_direction       direction )
1771{
1772    assert( peer );
1773    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1774
1775    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1776}
1777
1778
1779struct tr_peer_stat *
1780tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1781                     const   uint8_t     * torrentHash,
1782                     int                 * setmeCount UNUSED )
1783{
1784    int             i, size;
1785    const Torrent * t;
1786    tr_peer **      peers;
1787    tr_peer_stat *  ret;
1788
1789    assert( manager );
1790    managerLock( manager );
1791
1792    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1793    peers = getConnectedPeers( (Torrent*)t, &size );
1794    ret = tr_new0( tr_peer_stat, size );
1795
1796    for( i = 0; i < size; ++i )
1797    {
1798        char *                   pch;
1799        const tr_peer *          peer = peers[i];
1800        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1801        tr_peer_stat *           stat = ret + i;
1802
1803        tr_netNtop( &peer->in_addr, stat->addr, sizeof( stat->addr ) );
1804        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1805                   sizeof( stat->client ) );
1806        stat->port               = ntohs( peer->port );
1807        stat->from               = atom->from;
1808        stat->progress           = peer->progress;
1809        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1810        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1811        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1812        stat->peerIsChoked       = peer->peerIsChoked;
1813        stat->peerIsInterested   = peer->peerIsInterested;
1814        stat->clientIsChoked     = peer->clientIsChoked;
1815        stat->clientIsInterested = peer->clientIsInterested;
1816        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1817        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1818        stat->isUploadingTo      = clientIsUploadingTo( peer );
1819
1820        pch = stat->flagStr;
1821        if( t->optimistic == peer ) *pch++ = 'O';
1822        if( stat->isDownloadingFrom ) *pch++ = 'D';
1823        else if( stat->clientIsInterested ) *pch++ = 'd';
1824        if( stat->isUploadingTo ) *pch++ = 'U';
1825        else if( stat->peerIsInterested ) *pch++ = 'u';
1826        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1827                'K';
1828        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1829        if( stat->isEncrypted ) *pch++ = 'E';
1830        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1831        if( stat->isIncoming ) *pch++ = 'I';
1832        *pch = '\0';
1833    }
1834
1835    *setmeCount = size;
1836    tr_free( peers );
1837
1838    managerUnlock( manager );
1839    return ret;
1840}
1841
1842/**
1843***
1844**/
1845
1846struct ChokeData
1847{
1848    tr_bool         doUnchoke;
1849    tr_bool         isInterested;
1850    tr_bool         isChoked;
1851    int             rate;
1852    tr_peer *       peer;
1853};
1854
1855static int
1856compareChoke( const void * va,
1857              const void * vb )
1858{
1859    const struct ChokeData * a = va;
1860    const struct ChokeData * b = vb;
1861
1862    if( a->rate != b->rate ) /* prefer higher overall speeds */
1863        return a->rate > b->rate ? -1 : 1;
1864
1865    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1866        return a->isChoked ? 1 : -1;
1867
1868    return 0;
1869}
1870
1871static int
1872isNew( const tr_peer * peer )
1873{
1874    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1875}
1876
1877static int
1878isSame( const tr_peer * peer )
1879{
1880    return peer && peer->client && strstr( peer->client, "Transmission" );
1881}
1882
1883/**
1884***
1885**/
1886
1887static void
1888rechoke( Torrent * t )
1889{
1890    int                i, peerCount, size, unchokedInterested;
1891    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1892    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1893    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1894
1895    assert( torrentIsLocked( t ) );
1896
1897    /* sort the peers by preference and rate */
1898    for( i = 0, size = 0; i < peerCount; ++i )
1899    {
1900        tr_peer * peer = peers[i];
1901        if( peer->progress >= 1.0 ) /* choke all seeds */
1902            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1903        else if( chokeAll )
1904            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1905        else {
1906            struct ChokeData * n = &choke[size++];
1907            n->peer         = peer;
1908            n->isInterested = peer->peerIsInterested;
1909            n->isChoked     = peer->peerIsChoked;
1910            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1911        }
1912    }
1913
1914    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1915
1916    /**
1917     * Reciprocation and number of uploads capping is managed by unchoking
1918     * the N peers which have the best upload rate and are interested.
1919     * This maximizes the client's download rate. These N peers are
1920     * referred to as downloaders, because they are interested in downloading
1921     * from the client.
1922     *
1923     * Peers which have a better upload rate (as compared to the downloaders)
1924     * but aren't interested get unchoked. If they become interested, the
1925     * downloader with the worst upload rate gets choked. If a client has
1926     * a complete file, it uses its upload rate rather than its download
1927     * rate to decide which peers to unchoke.
1928     */
1929    unchokedInterested = 0;
1930    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1931    {
1932        choke[i].doUnchoke = 1;
1933        if( choke[i].isInterested )
1934            ++unchokedInterested;
1935    }
1936
1937    /* optimistic unchoke */
1938    if( i < size )
1939    {
1940        int                n;
1941        struct ChokeData * c;
1942        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1943
1944        for( ; i < size; ++i )
1945        {
1946            if( choke[i].isInterested )
1947            {
1948                const tr_peer * peer = choke[i].peer;
1949                int             x = 1, y;
1950                if( isNew( peer ) ) x *= 3;
1951                if( isSame( peer ) ) x *= 3;
1952                for( y = 0; y < x; ++y )
1953                    tr_ptrArrayAppend( randPool, &choke[i] );
1954            }
1955        }
1956
1957        if( ( n = tr_ptrArraySize( randPool ) ) )
1958        {
1959            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1960            c->doUnchoke = 1;
1961            t->optimistic = c->peer;
1962        }
1963
1964        tr_ptrArrayFree( randPool, NULL );
1965    }
1966
1967    for( i = 0; i < size; ++i )
1968        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1969
1970    /* cleanup */
1971    tr_free( choke );
1972    tr_free( peers );
1973}
1974
1975static int
1976rechokePulse( void * vtorrent )
1977{
1978    Torrent * t = vtorrent;
1979
1980    torrentLock( t );
1981    rechoke( t );
1982    torrentUnlock( t );
1983    return TRUE;
1984}
1985
1986/***
1987****
1988****  Life and Death
1989****
1990***/
1991
1992static int
1993shouldPeerBeClosed( const Torrent * t,
1994                    const tr_peer * peer,
1995                    int             peerCount )
1996{
1997    const tr_torrent *       tor = t->tor;
1998    const time_t             now = time( NULL );
1999    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2000
2001    /* if it's marked for purging, close it */
2002    if( peer->doPurge )
2003    {
2004        tordbg( t, "purging peer %s because its doPurge flag is set",
2005               tr_peerIoAddrStr( &atom->addr,
2006                                 atom->port ) );
2007        return TRUE;
2008    }
2009
2010    /* if we're seeding and the peer has everything we have,
2011     * and enough time has passed for a pex exchange, then disconnect */
2012    if( tr_torrentIsSeed( tor ) )
2013    {
2014        int peerHasEverything;
2015        if( atom->flags & ADDED_F_SEED_FLAG )
2016            peerHasEverything = TRUE;
2017        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2018            peerHasEverything = FALSE;
2019        else
2020        {
2021            tr_bitfield * tmp =
2022                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2023            tr_bitfieldDifference( tmp, peer->have );
2024            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2025            tr_bitfieldFree( tmp );
2026        }
2027        if( peerHasEverything
2028          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2029        {
2030            tordbg( t, "purging peer %s because we're both seeds",
2031                   tr_peerIoAddrStr( &atom->addr,
2032                                     atom->port ) );
2033            return TRUE;
2034        }
2035    }
2036
2037    /* disconnect if it's been too long since piece data has been transferred.
2038     * this is on a sliding scale based on number of available peers... */
2039    {
2040        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2041        /* if we have >= relaxIfFewerThan, strictness is 100%.
2042         * if we have zero connections, strictness is 0% */
2043        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2044            ? 1.0
2045            : peerCount / (float)relaxStrictnessIfFewerThanN;
2046        const int lo = MIN_UPLOAD_IDLE_SECS;
2047        const int hi = MAX_UPLOAD_IDLE_SECS;
2048        const int limit = hi - ( ( hi - lo ) * strictness );
2049        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2050/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2051        if( idleTime > limit ) {
2052            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2053                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2054            return TRUE;
2055        }
2056    }
2057
2058    return FALSE;
2059}
2060
2061static tr_peer **
2062getPeersToClose( Torrent * t,
2063                 int *     setmeSize )
2064{
2065    int               i, peerCount, outsize;
2066    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2067                                                           &peerCount );
2068    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2069
2070    assert( torrentIsLocked( t ) );
2071
2072    for( i = outsize = 0; i < peerCount; ++i )
2073        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2074            ret[outsize++] = peers[i];
2075
2076    *setmeSize = outsize;
2077    return ret;
2078}
2079
2080static int
2081compareCandidates( const void * va,
2082                   const void * vb )
2083{
2084    const struct peer_atom * a = *(const struct peer_atom**) va;
2085    const struct peer_atom * b = *(const struct peer_atom**) vb;
2086
2087    /* <Charles> Here we would probably want to try reconnecting to
2088     * peers that had most recently given us data. Lots of users have
2089     * trouble with resets due to their routers and/or ISPs. This way we
2090     * can quickly recover from an unwanted reset. So we sort
2091     * piece_data_time in descending order.
2092     */
2093
2094    if( a->piece_data_time != b->piece_data_time )
2095        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2096
2097    if( a->numFails != b->numFails )
2098        return a->numFails < b->numFails ? -1 : 1;
2099
2100    if( a->time != b->time )
2101        return a->time < b->time ? -1 : 1;
2102
2103    return 0;
2104}
2105
2106static int
2107getReconnectIntervalSecs( const struct peer_atom * atom )
2108{
2109    int          sec;
2110    const time_t now = time( NULL );
2111
2112    /* if we were recently connected to this peer and transferring piece
2113     * data, try to reconnect to them sooner rather that later -- we don't
2114     * want network troubles to get in the way of a good peer. */
2115    if( ( now - atom->piece_data_time ) <=
2116       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2117        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2118
2119    /* don't allow reconnects more often than our minimum */
2120    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2121        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2122
2123    /* otherwise, the interval depends on how many times we've tried
2124     * and failed to connect to the peer */
2125    else switch( atom->numFails )
2126        {
2127            case 0:
2128                sec = 0; break;
2129
2130            case 1:
2131                sec = 5; break;
2132
2133            case 2:
2134                sec = 2 * 60; break;
2135
2136            case 3:
2137                sec = 15 * 60; break;
2138
2139            case 4:
2140                sec = 30 * 60; break;
2141
2142            case 5:
2143                sec = 60 * 60; break;
2144
2145            default:
2146                sec = 120 * 60; break;
2147        }
2148
2149    return sec;
2150}
2151
2152static struct peer_atom **
2153getPeerCandidates(                               Torrent * t,
2154                                           int * setmeSize )
2155{
2156    int                 i, atomCount, retCount;
2157    struct peer_atom ** atoms;
2158    struct peer_atom ** ret;
2159    const time_t        now = time( NULL );
2160    const int           seed = tr_torrentIsSeed( t->tor );
2161
2162    assert( torrentIsLocked( t ) );
2163
2164    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2165    ret = tr_new( struct peer_atom*, atomCount );
2166    for( i = retCount = 0; i < atomCount; ++i )
2167    {
2168        int                interval;
2169        struct peer_atom * atom = atoms[i];
2170
2171        /* peer fed us too much bad data ... we only keep it around
2172         * now to weed it out in case someone sends it to us via pex */
2173        if( atom->myflags & MYFLAG_BANNED )
2174            continue;
2175
2176        /* peer was unconnectable before, so we're not going to keep trying.
2177         * this is needs a separate flag from `banned', since if they try
2178         * to connect to us later, we'll let them in */
2179        if( atom->myflags & MYFLAG_UNREACHABLE )
2180            continue;
2181
2182        /* we don't need two connections to the same peer... */
2183        if( peerIsInUse( t, &atom->addr ) )
2184            continue;
2185
2186        /* no need to connect if we're both seeds... */
2187        if( seed && ( atom->flags & ADDED_F_SEED_FLAG ) )
2188            continue;
2189
2190        /* don't reconnect too often */
2191        interval = getReconnectIntervalSecs( atom );
2192        if( ( now - atom->time ) < interval )
2193        {
2194            tordbg(
2195                t,
2196                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2197                i, tr_peerIoAddrStr( &atom->addr,
2198                                     atom->port ), interval );
2199            continue;
2200        }
2201
2202        /* Don't connect to peers in our blocklist */
2203        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2204            continue;
2205
2206        ret[retCount++] = atom;
2207    }
2208
2209    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2210    *setmeSize = retCount;
2211    return ret;
2212}
2213
2214static int
2215reconnectPulse( void * vtorrent )
2216{
2217    Torrent *     t = vtorrent;
2218    static time_t prevTime = 0;
2219    static int    newConnectionsThisSecond = 0;
2220    time_t        now;
2221
2222    torrentLock( t );
2223
2224    now = time( NULL );
2225    if( prevTime != now )
2226    {
2227        prevTime = now;
2228        newConnectionsThisSecond = 0;
2229    }
2230
2231    if( !t->isRunning )
2232    {
2233        removeAllPeers( t );
2234    }
2235    else
2236    {
2237        int                 i, nCandidates, nBad;
2238        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2239        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2240
2241        if( nBad || nCandidates )
2242            tordbg(
2243                t, "reconnect pulse for [%s]: %d bad connections, "
2244                   "%d connection candidates, %d atoms, max per pulse is %d",
2245                t->tor->info.name, nBad, nCandidates,
2246                tr_ptrArraySize( t->pool ),
2247                (int)MAX_RECONNECTIONS_PER_PULSE );
2248
2249        /* disconnect some peers.
2250           if we transferred piece data, then they might be good peers,
2251           so reset their `numFails' weight to zero.  otherwise we connected
2252           to them fruitlessly, so mark it as another fail */
2253        for( i = 0; i < nBad; ++i )
2254        {
2255            tr_peer *          peer = connections[i];
2256            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
2257            if( atom->piece_data_time )
2258                atom->numFails = 0;
2259            else
2260                ++atom->numFails;
2261            tordbg( t, "removing bad peer %s",
2262                   tr_peerIoGetAddrStr( peer->io ) );
2263            removePeer( t, peer );
2264        }
2265
2266        /* add some new ones */
2267        for( i = 0;    ( i < nCandidates )
2268           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2269           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2270           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2271             ++i )
2272        {
2273            tr_peerMgr *       mgr = t->manager;
2274            struct peer_atom * atom = candidates[i];
2275            tr_peerIo *        io;
2276
2277            tordbg( t, "Starting an OUTGOING connection with %s",
2278                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2279
2280            io =
2281                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2282                                      t->hash );
2283            if( io == NULL )
2284            {
2285                atom->myflags |= MYFLAG_UNREACHABLE;
2286            }
2287            else
2288            {
2289                tr_handshake * handshake = tr_handshakeNew(
2290                    io,
2291                    mgr->session->
2292                    encryptionMode,
2293                    myHandshakeDoneCB,
2294                    mgr );
2295
2296                assert( tr_peerIoGetTorrentHash( io ) );
2297
2298                ++newConnectionsThisSecond;
2299
2300                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2301                                         handshakeCompare );
2302            }
2303
2304            atom->time = time( NULL );
2305        }
2306
2307        /* cleanup */
2308        tr_free( connections );
2309        tr_free( candidates );
2310    }
2311
2312    torrentUnlock( t );
2313    return TRUE;
2314}
2315
2316/****
2317*****
2318*****  BANDWIDTH ALLOCATION
2319*****
2320****/
2321
2322static void
2323pumpAllPeers( tr_peerMgr * mgr )
2324{
2325    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2326    int       i, j;
2327
2328    for( i=0; i<torrentCount; ++i )
2329    {
2330        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2331        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2332        {
2333            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2334            tr_peerMsgsPulse( peer->msgs );
2335        }
2336    }
2337}
2338
2339static int
2340bandwidthPulse( void * vmgr )
2341{
2342    tr_peerMgr * mgr = vmgr;
2343    managerLock( mgr );
2344
2345    pumpAllPeers( mgr );
2346    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2347    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2348    pumpAllPeers( mgr );
2349
2350    managerUnlock( mgr );
2351    return TRUE;
2352}
Note: See TracBrowser for help on using the repository browser.