source: trunk/libtransmission/peer-mgr.c @ 7241

Last change on this file since 7241 was 7241, checked in by charles, 13 years ago

(libT) add tr_peer_stat.isSeed

  • Property svn:keywords set to Date Rev Author Id
File size: 65.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7241 2008-12-02 23:16:01Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 60 * 3 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 10 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 250,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/* We keep one of these for every peer we know about, whether
100 * it's connected or not, so the struct must be small.
101 * When our current connections underperform, we dip back
102 * into this list for new ones. */
103struct peer_atom
104{
105    uint8_t     from;
106    uint8_t     flags;       /* these match the added_f flags */
107    uint8_t     myflags;     /* flags that aren't defined in added_f */
108    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
109    uint8_t     partialSeed;
110    tr_port     port;
111    uint16_t    numFails;
112    tr_address  addr;
113    time_t      time;        /* when the peer's connection status last changed */
114    time_t      piece_data_time;
115};
116
117typedef struct
118{
119    tr_bool         isRunning;
120
121    uint8_t         hash[SHA_DIGEST_LENGTH];
122    int         *   pendingRequestCount;
123    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
124    tr_ptrArray *   pool; /* struct peer_atom */
125    tr_ptrArray *   peers; /* tr_peer */
126    tr_ptrArray *   webseeds; /* tr_webseed */
127    tr_timer *      reconnectTimer;
128    tr_timer *      rechokeTimer;
129    tr_timer *      refillTimer;
130    tr_torrent *    tor;
131    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
132
133    struct tr_peerMgr * manager;
134}
135Torrent;
136
137struct tr_peerMgr
138{
139    tr_session      * session;
140    tr_ptrArray     * torrents; /* Torrent */
141    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
142    tr_timer        * bandwidthTimer;
143};
144
145#define tordbg( t, ... ) \
146    do { \
147        if( tr_deepLoggingIsActive( ) ) \
148            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
149    } while( 0 )
150
151#define dbgmsg( ... ) \
152    do { \
153        if( tr_deepLoggingIsActive( ) ) \
154            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
155    } while( 0 )
156
157/**
158***
159**/
160
161static void
162managerLock( const struct tr_peerMgr * manager )
163{
164    tr_globalLock( manager->session );
165}
166
167static void
168managerUnlock( const struct tr_peerMgr * manager )
169{
170    tr_globalUnlock( manager->session );
171}
172
173static void
174torrentLock( Torrent * torrent )
175{
176    managerLock( torrent->manager );
177}
178
179static void
180torrentUnlock( Torrent * torrent )
181{
182    managerUnlock( torrent->manager );
183}
184
185static int
186torrentIsLocked( const Torrent * t )
187{
188    return tr_globalIsLocked( t->manager->session );
189}
190
191/**
192***
193**/
194
195static int
196handshakeCompareToAddr( const void * va,
197                        const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a,
206                  const void * b )
207{
208    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
209}
210
211static tr_handshake*
212getExistingHandshake( tr_ptrArray      * handshakes,
213                      const tr_address * addr )
214{
215    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
216}
217
218static int
219comparePeerAtomToAddress( const void * va, const void * vb )
220{
221    const struct peer_atom * a = va;
222
223    return tr_compareAddresses( &a->addr, vb );
224}
225
226static int
227comparePeerAtoms( const void * va, const void * vb )
228{
229    const struct peer_atom * b = vb;
230
231    return comparePeerAtomToAddress( va, &b->addr );
232}
233
234/**
235***
236**/
237
238static int
239torrentCompare( const void * va,
240                const void * vb )
241{
242    const Torrent * a = va;
243    const Torrent * b = vb;
244
245    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
246}
247
248static int
249torrentCompareToHash( const void * va,
250                      const void * vb )
251{
252    const Torrent * a = va;
253    const uint8_t * b_hash = vb;
254
255    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
256}
257
258static Torrent*
259getExistingTorrent( tr_peerMgr *    manager,
260                    const uint8_t * hash )
261{
262    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
263                                             hash,
264                                             torrentCompareToHash );
265}
266
267static int
268peerCompare( const void * va, const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272
273    return tr_compareAddresses( &a->addr, &b->addr );
274}
275
276static int
277peerCompareToAddr( const void * va, const void * vb )
278{
279    const tr_peer * a = va;
280
281    return tr_compareAddresses( &a->addr, vb );
282}
283
284static tr_peer*
285getExistingPeer( Torrent          * torrent,
286                 const tr_address * addr )
287{
288    assert( torrentIsLocked( torrent ) );
289    assert( addr );
290
291    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
292}
293
294static struct peer_atom*
295getExistingAtom( const Torrent    * t,
296                 const tr_address * addr )
297{
298    assert( torrentIsLocked( t ) );
299    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
300}
301
302static int
303peerIsInUse( const Torrent    * ct,
304             const tr_address * addr )
305{
306    Torrent * t = (Torrent*) ct;
307
308    assert( torrentIsLocked ( t ) );
309
310    return getExistingPeer( t, addr )
311           || getExistingHandshake( t->outgoingHandshakes, addr )
312           || getExistingHandshake( t->manager->incomingHandshakes, addr );
313}
314
315static tr_peer*
316peerConstructor( tr_torrent * tor, const tr_address * addr )
317{
318    tr_peer * p;
319
320    p = tr_new0( tr_peer, 1 );
321    p->addr = *addr;
322    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
323    return p;
324}
325
326static tr_peer*
327getPeer( Torrent          * torrent,
328         const tr_address * addr )
329{
330    tr_peer * peer;
331
332    assert( torrentIsLocked( torrent ) );
333
334    peer = getExistingPeer( torrent, addr );
335
336    if( peer == NULL )
337    {
338        peer = peerConstructor( torrent->tor, addr );
339        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
340    }
341
342    return peer;
343}
344
345static void
346peerDestructor( tr_peer * peer )
347{
348    assert( peer );
349
350    if( peer->msgs != NULL )
351    {
352        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
353        tr_peerMsgsFree( peer->msgs );
354    }
355
356    tr_peerIoFree( peer->io );
357
358    tr_bitfieldFree( peer->have );
359    tr_bitfieldFree( peer->blame );
360    tr_free( peer->client );
361
362    tr_bandwidthFree( peer->bandwidth );
363
364    tr_free( peer );
365}
366
367static void
368removePeer( Torrent * t,
369            tr_peer * peer )
370{
371    tr_peer *          removed;
372    struct peer_atom * atom;
373
374    assert( torrentIsLocked( t ) );
375
376    atom = getExistingAtom( t, &peer->addr );
377    assert( atom );
378    atom->time = time( NULL );
379
380    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
381    assert( removed == peer );
382    peerDestructor( removed );
383}
384
385static void
386removeAllPeers( Torrent * t )
387{
388    while( !tr_ptrArrayEmpty( t->peers ) )
389        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
390}
391
392static void
393torrentDestructor( void * vt )
394{
395    Torrent * t = vt;
396    uint8_t   hash[SHA_DIGEST_LENGTH];
397
398    assert( t );
399    assert( !t->isRunning );
400    assert( t->peers );
401    assert( torrentIsLocked( t ) );
402    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
403    assert( tr_ptrArrayEmpty( t->peers ) );
404
405    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
406
407    tr_timerFree( &t->reconnectTimer );
408    tr_timerFree( &t->rechokeTimer );
409    tr_timerFree( &t->refillTimer );
410
411    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
412    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
413    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
414    tr_ptrArrayFree( t->peers, NULL );
415
416    tr_free( t->pendingRequestCount );
417    tr_free( t );
418}
419
420static void peerCallbackFunc( void * vpeer,
421                              void * vevent,
422                              void * vt );
423
424static Torrent*
425torrentConstructor( tr_peerMgr * manager,
426                    tr_torrent * tor )
427{
428    int       i;
429    Torrent * t;
430
431    t = tr_new0( Torrent, 1 );
432    t->manager = manager;
433    t->tor = tor;
434    t->pool = tr_ptrArrayNew( );
435    t->peers = tr_ptrArrayNew( );
436    t->webseeds = tr_ptrArrayNew( );
437    t->outgoingHandshakes = tr_ptrArrayNew( );
438    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
439
440    for( i = 0; i < tor->info.webseedCount; ++i )
441    {
442        tr_webseed * w =
443            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
444                           t );
445        tr_ptrArrayAppend( t->webseeds, w );
446    }
447
448    return t;
449}
450
451
452static int bandwidthPulse( void * vmgr );
453
454
455tr_peerMgr*
456tr_peerMgrNew( tr_session * session )
457{
458    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
459
460    m->session = session;
461    m->torrents = tr_ptrArrayNew( );
462    m->incomingHandshakes = tr_ptrArrayNew( );
463    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
464    return m;
465}
466
467void
468tr_peerMgrFree( tr_peerMgr * manager )
469{
470    managerLock( manager );
471
472    tr_timerFree( &manager->bandwidthTimer );
473
474    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
475     * the item from manager->handshakes, so this is a little roundabout... */
476    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
477        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
478
479    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
480
481    /* free the torrents. */
482    tr_ptrArrayFree( manager->torrents, torrentDestructor );
483
484    managerUnlock( manager );
485    tr_free( manager );
486}
487
488static tr_peer**
489getConnectedPeers( Torrent * t,
490                   int *     setmeCount )
491{
492    int       i, peerCount, connectionCount;
493    tr_peer **peers;
494    tr_peer **ret;
495
496    assert( torrentIsLocked( t ) );
497
498    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
499    ret = tr_new( tr_peer *, peerCount );
500
501    for( i = connectionCount = 0; i < peerCount; ++i )
502        if( peers[i]->msgs )
503            ret[connectionCount++] = peers[i];
504
505    *setmeCount = connectionCount;
506    return ret;
507}
508
509static int
510clientIsDownloadingFrom( const tr_peer * peer )
511{
512    return peer->clientIsInterested && !peer->clientIsChoked;
513}
514
515static int
516clientIsUploadingTo( const tr_peer * peer )
517{
518    return peer->peerIsInterested && !peer->peerIsChoked;
519}
520
521/***
522****
523***/
524
525int
526tr_peerMgrPeerIsSeed( const tr_peerMgr * mgr,
527                      const uint8_t    * torrentHash,
528                      const tr_address * addr )
529{
530    int                      isSeed = FALSE;
531    const Torrent *          t = NULL;
532    const struct peer_atom * atom = NULL;
533
534    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
535    if( t )
536        atom = getExistingAtom( t, addr );
537    if( atom )
538        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
539
540    return isSeed;
541}
542
543/****
544*****
545*****  REFILL
546*****
547****/
548
549static void
550assertValidPiece( Torrent * t, tr_piece_index_t piece )
551{
552    assert( t );
553    assert( t->tor );
554    assert( piece < t->tor->info.pieceCount );
555}
556
557static int
558getPieceRequests( Torrent * t, tr_piece_index_t piece )
559{
560    assertValidPiece( t, piece );
561
562    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
563}
564
565static void
566incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
567{
568    assertValidPiece( t, piece );
569
570    if( t->pendingRequestCount == NULL )
571        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
572    t->pendingRequestCount[piece]++;
573}
574
575static void
576decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
577{
578    assertValidPiece( t, piece );
579
580    if( t->pendingRequestCount )
581        t->pendingRequestCount[piece]--;
582}
583
584struct tr_refill_piece
585{
586    tr_priority_t    priority;
587    uint32_t         piece;
588    uint32_t         peerCount;
589    int              random;
590    int              pendingRequestCount;
591    int              missingBlockCount;
592};
593
594static int
595compareRefillPiece( const void * aIn,
596                    const void * bIn )
597{
598    const struct tr_refill_piece * a = aIn;
599    const struct tr_refill_piece * b = bIn;
600
601    /* if one piece has a higher priority, it goes first */
602    if( a->priority != b->priority )
603        return a->priority > b->priority ? -1 : 1;
604
605    /* have a per-priority endgame */
606    if( a->pendingRequestCount != b->pendingRequestCount )
607        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
608
609    /* fewer missing pieces goes first */
610    if( a->missingBlockCount != b->missingBlockCount )
611        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
612
613    /* otherwise if one has fewer peers, it goes first */
614    if( a->peerCount != b->peerCount )
615        return a->peerCount < b->peerCount ? -1 : 1;
616
617    /* otherwise go with our random seed */
618    if( a->random != b->random )
619        return a->random < b->random ? -1 : 1;
620
621    return 0;
622}
623
624static tr_piece_index_t *
625getPreferredPieces( Torrent           * t,
626                    tr_piece_index_t  * pieceCount )
627{
628    const tr_torrent  * tor = t->tor;
629    const tr_info     * inf = &tor->info;
630    tr_piece_index_t    i;
631    tr_piece_index_t    poolSize = 0;
632    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
633    int                 peerCount;
634    tr_peer**           peers;
635
636    assert( torrentIsLocked( t ) );
637
638    peers = getConnectedPeers( t, &peerCount );
639
640    /* make a list of the pieces that we want but don't have */
641    for( i = 0; i < inf->pieceCount; ++i )
642        if( !tor->info.pieces[i].dnd
643                && !tr_cpPieceIsComplete( tor->completion, i ) )
644            pool[poolSize++] = i;
645
646    /* sort the pool by which to request next */
647    if( poolSize > 1 )
648    {
649        tr_piece_index_t j;
650        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
651
652        for( j = 0; j < poolSize; ++j )
653        {
654            int k;
655            const tr_piece_index_t piece = pool[j];
656            struct tr_refill_piece * setme = p + j;
657
658            setme->piece = piece;
659            setme->priority = inf->pieces[piece].priority;
660            setme->peerCount = 0;
661            setme->random = tr_cryptoWeakRandInt( INT_MAX );
662            setme->pendingRequestCount = getPieceRequests( t, piece );
663            setme->missingBlockCount
664                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
665
666            for( k = 0; k < peerCount; ++k )
667            {
668                const tr_peer * peer = peers[k];
669                if( peer->peerIsInterested
670                        && !peer->clientIsChoked
671                        && tr_bitfieldHas( peer->have, piece ) )
672                    ++setme->peerCount;
673            }
674        }
675
676        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
677               compareRefillPiece );
678
679        for( j = 0; j < poolSize; ++j )
680            pool[j] = p[j].piece;
681
682        tr_free( p );
683    }
684
685    tr_free( peers );
686
687    *pieceCount = poolSize;
688    return pool;
689}
690
691struct tr_blockIterator
692{
693    Torrent * t;
694    tr_block_index_t blockIndex, blockCount, *blocks;
695    tr_piece_index_t pieceIndex, pieceCount, *pieces;
696};
697
698static struct tr_blockIterator*
699blockIteratorNew( Torrent * t )
700{
701    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
702    i->t = t;
703    i->pieces = getPreferredPieces( t, &i->pieceCount );
704    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
705    return i;
706}
707
708static int
709blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
710{
711    int found;
712    Torrent * t = i->t;
713    tr_torrent * tor = t->tor;
714
715    while( ( i->blockIndex == i->blockCount )
716        && ( i->pieceIndex < i->pieceCount ) )
717    {
718        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
719        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
720        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
721        tr_block_index_t block;
722
723        assert( index < tor->info.pieceCount );
724
725        i->blockCount = 0;
726        i->blockIndex = 0;
727        for( block=b; block!=e; ++block )
728            if( !tr_cpBlockIsComplete( tor->completion, block ) )
729                i->blocks[i->blockCount++] = block;
730    }
731
732    if(( found = ( i->blockIndex < i->blockCount )))
733        *setme = i->blocks[i->blockIndex++];
734
735    return found;
736}
737
738static void
739blockIteratorFree( struct tr_blockIterator * i )
740{
741    tr_free( i->blocks );
742    tr_free( i->pieces );
743    tr_free( i );
744}
745
746static tr_peer**
747getPeersUploadingToClient( Torrent * t,
748                           int *     setmeCount )
749{
750    int j;
751    int peerCount = 0;
752    int retCount = 0;
753    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
754    tr_peer ** ret = tr_new( tr_peer *, peerCount );
755
756    j = 0; /* this is a temporary test to make sure we walk through all the peers */
757    if( peerCount )
758    {
759        /* Get a list of peers we're downloading from.
760           Pick a different starting point each time so all peers
761           get a chance at being the first in line */
762        const int fencepost = tr_cryptoWeakRandInt( peerCount );
763        int i = fencepost;
764        do {
765            if( clientIsDownloadingFrom( peers[i] ) )
766                ret[retCount++] = peers[i];
767            i = ( i + 1 ) % peerCount;
768            ++j;
769        } while( i != fencepost );
770    }
771    assert( j == peerCount );
772    *setmeCount = retCount;
773    return ret;
774}
775
776static uint32_t
777getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
778{
779    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
780    const uint64_t blockPos = tor->blockSize * b;
781    assert( blockPos >= piecePos );
782    return (uint32_t)( blockPos - piecePos );
783}
784
785static int
786refillPulse( void * vtorrent )
787{
788    tr_block_index_t block;
789    int peerCount;
790    int webseedCount;
791    tr_peer ** peers;
792    tr_webseed ** webseeds;
793    struct tr_blockIterator * blockIterator;
794    Torrent * t = vtorrent;
795    tr_torrent * tor = t->tor;
796
797    if( !t->isRunning )
798        return TRUE;
799    if( tr_torrentIsSeed( t->tor ) )
800        return TRUE;
801
802    torrentLock( t );
803    tordbg( t, "Refilling Request Buffers..." );
804
805    blockIterator = blockIteratorNew( t );
806    peers = getPeersUploadingToClient( t, &peerCount );
807    webseedCount = tr_ptrArraySize( t->webseeds );
808    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
809                          webseedCount * sizeof( tr_webseed* ) );
810
811    while( ( webseedCount || peerCount )
812        && blockIteratorNext( blockIterator, &block ) )
813    {
814        int j;
815        int handled = FALSE;
816
817        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
818        const uint32_t offset = getBlockOffsetInPiece( tor, block );
819        const uint32_t length = tr_torBlockCountBytes( tor, block );
820
821        /* find a peer who can ask for this block */
822        for( j=0; !handled && j<peerCount; )
823        {
824            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
825            switch( val )
826            {
827                case TR_ADDREQ_FULL:
828                case TR_ADDREQ_CLIENT_CHOKED:
829                    peers[j] = peers[--peerCount];
830                    break;
831
832                case TR_ADDREQ_MISSING:
833                case TR_ADDREQ_DUPLICATE:
834                    ++j;
835                    break;
836
837                case TR_ADDREQ_OK:
838                    incrementPieceRequests( t, index );
839                    handled = TRUE;
840                    break;
841
842                default:
843                    assert( 0 && "unhandled value" );
844                    break;
845            }
846        }
847
848        /* maybe one of the webseeds can do it */
849        for( j=0; !handled && j<webseedCount; )
850        {
851            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
852                                                          index, offset, length );
853            switch( val )
854            {
855                case TR_ADDREQ_FULL:
856                    webseeds[j] = webseeds[--webseedCount];
857                    break;
858
859                case TR_ADDREQ_OK:
860                    incrementPieceRequests( t, index );
861                    handled = TRUE;
862                    break;
863
864                default:
865                    assert( 0 && "unhandled value" );
866                    break;
867            }
868        }
869    }
870
871    /* cleanup */
872    blockIteratorFree( blockIterator );
873    tr_free( webseeds );
874    tr_free( peers );
875
876    t->refillTimer = NULL;
877    torrentUnlock( t );
878    return FALSE;
879}
880
881static void
882broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
883{
884    int i, size;
885    tr_peer ** peers;
886
887    assert( torrentIsLocked( t ) );
888
889    peers = getConnectedPeers( t, &size );
890    for( i=0; i<size; ++i )
891        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
892    tr_free( peers );
893}
894
895static void
896addStrike( Torrent * t,
897           tr_peer * peer )
898{
899    tordbg( t, "increasing peer %s strike count to %d",
900            tr_peerIoAddrStr( &peer->addr,
901                              peer->port ), peer->strikes + 1 );
902
903    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
904    {
905        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
906        atom->myflags |= MYFLAG_BANNED;
907        peer->doPurge = 1;
908        tordbg( t, "banning peer %s",
909               tr_peerIoAddrStr( &atom->addr, atom->port ) );
910    }
911}
912
913static void
914gotBadPiece( Torrent *        t,
915             tr_piece_index_t pieceIndex )
916{
917    tr_torrent *   tor = t->tor;
918    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
919
920    tor->corruptCur += byteCount;
921    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
922}
923
924static void
925refillSoon( Torrent * t )
926{
927    if( t->refillTimer == NULL )
928        t->refillTimer = tr_timerNew( t->manager->session,
929                                      refillPulse, t,
930                                      REFILL_PERIOD_MSEC );
931}
932
933static void
934peerSuggestedPiece( Torrent            * t,
935                    tr_peer            * peer,
936                    tr_piece_index_t     pieceIndex,
937                    int                  isFastAllowed )
938{
939    assert( t );
940    assert( peer );
941    assert( peer->msgs );
942
943    /* is this a valid piece? */
944    if(  pieceIndex >= t->tor->info.pieceCount )
945        return;
946
947    /* don't ask for it if we've already got it */
948    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
949        return;
950
951    /* don't ask for it if they don't have it */
952    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
953        return;
954
955    /* don't ask for it if we're choked and it's not fast */
956    if( !isFastAllowed && peer->clientIsChoked )
957        return;
958
959    /* request the blocks that we don't have in this piece */
960    {
961        tr_block_index_t block;
962        const tr_torrent * tor = t->tor;
963        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
964        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
965
966        for( block=start; block<end; ++block )
967        {
968            if( !tr_cpBlockIsComplete( tor->completion, block ) )
969            {
970                const uint32_t offset = getBlockOffsetInPiece( tor, block );
971                const uint32_t length = tr_torBlockCountBytes( tor, block );
972                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
973                incrementPieceRequests( t, pieceIndex );
974            }
975        }
976    }
977}
978
979static void
980peerCallbackFunc( void * vpeer,
981                  void * vevent,
982                  void * vt )
983{
984    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
985    Torrent             * t = vt;
986    const tr_peer_event * e = vevent;
987
988    torrentLock( t );
989
990    switch( e->eventType )
991    {
992        case TR_PEER_UPLOAD_ONLY:
993            /* update our atom */
994            if( peer ) {
995                struct peer_atom * a = getExistingAtom( t, &peer->addr );
996                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
997            }
998            break;
999
1000        case TR_PEER_NEED_REQ:
1001            refillSoon( t );
1002            break;
1003
1004        case TR_PEER_CANCEL:
1005            decrementPieceRequests( t, e->pieceIndex );
1006            break;
1007
1008        case TR_PEER_PEER_GOT_DATA:
1009        {
1010            const time_t now = time( NULL );
1011            tr_torrent * tor = t->tor;
1012
1013            tor->activityDate = now;
1014
1015            if( e->wasPieceData )
1016                tor->uploadedCur += e->length;
1017
1018            /* update the stats */
1019            if( e->wasPieceData )
1020                tr_statsAddUploaded( tor->session, e->length );
1021
1022            /* update our atom */
1023            if( peer ) {
1024                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1025                a->piece_data_time = now;
1026            }
1027
1028            break;
1029        }
1030
1031        case TR_PEER_CLIENT_GOT_SUGGEST:
1032            if( peer )
1033                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1034            break;
1035
1036        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1037            if( peer )
1038                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1039            break;
1040
1041        case TR_PEER_CLIENT_GOT_DATA:
1042        {
1043            const time_t now = time( NULL );
1044            tr_torrent * tor = t->tor;
1045
1046            tor->activityDate = now;
1047
1048            /* only add this to downloadedCur if we got it from a peer --
1049             * webseeds shouldn't count against our ratio.  As one tracker
1050             * admin put it, "Those pieces are downloaded directly from the
1051             * content distributor, not the peers, it is the tracker's job
1052             * to manage the swarms, not the web server and does not fit
1053             * into the jurisdiction of the tracker." */
1054            if( peer && e->wasPieceData )
1055                tor->downloadedCur += e->length;
1056
1057            /* update the stats */ 
1058            if( e->wasPieceData )
1059                tr_statsAddDownloaded( tor->session, e->length );
1060
1061            /* update our atom */
1062            if( peer ) {
1063                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1064                a->piece_data_time = now;
1065            }
1066
1067            break;
1068        }
1069
1070        case TR_PEER_PEER_PROGRESS:
1071        {
1072            if( peer )
1073            {
1074                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1075                const int          peerIsSeed = e->progress >= 1.0;
1076                if( peerIsSeed )
1077                {
1078                    tordbg( t, "marking peer %s as a seed",
1079                           tr_peerIoAddrStr( &atom->addr,
1080                                             atom->port ) );
1081                    atom->flags |= ADDED_F_SEED_FLAG;
1082                }
1083                else
1084                {
1085                    tordbg( t, "marking peer %s as a non-seed",
1086                           tr_peerIoAddrStr( &atom->addr,
1087                                             atom->port ) );
1088                    atom->flags &= ~ADDED_F_SEED_FLAG;
1089                }
1090            }
1091            break;
1092        }
1093
1094        case TR_PEER_CLIENT_GOT_BLOCK:
1095        {
1096            tr_torrent *     tor = t->tor;
1097
1098            tr_block_index_t block = _tr_block( tor, e->pieceIndex,
1099                                                e->offset );
1100
1101            tr_cpBlockAdd( tor->completion, block );
1102            decrementPieceRequests( t, e->pieceIndex );
1103
1104            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1105
1106            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1107            {
1108                const tr_piece_index_t p = e->pieceIndex;
1109                const int              ok = tr_ioTestPiece( tor, p );
1110
1111                if( !ok )
1112                {
1113                    tr_torerr( tor,
1114                              _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1115                              (unsigned long)p );
1116                }
1117
1118                tr_torrentSetHasPiece( tor, p, ok );
1119                tr_torrentSetPieceChecked( tor, p, TRUE );
1120                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1121
1122                if( !ok )
1123                    gotBadPiece( t, p );
1124                else
1125                {
1126                    int        i, peerCount;
1127                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1128                    for( i = 0; i < peerCount; ++i )
1129                        tr_peerMsgsHave( peers[i]->msgs, p );
1130                    tr_free( peers );
1131                }
1132
1133                tr_torrentRecheckCompleteness( tor );
1134            }
1135            break;
1136        }
1137
1138        case TR_PEER_ERROR:
1139            if( e->err == EINVAL )
1140            {
1141                addStrike( t, peer );
1142                peer->doPurge = 1;
1143            }
1144            else if( ( e->err == ERANGE )
1145                  || ( e->err == EMSGSIZE )
1146                  || ( e->err == ENOTCONN ) )
1147            {
1148                /* some protocol error from the peer */
1149                peer->doPurge = 1;
1150            }
1151            else /* a local error, such as an IO error */
1152            {
1153                t->tor->error = e->err;
1154                tr_strlcpy( t->tor->errorString,
1155                            tr_strerror( t->tor->error ),
1156                            sizeof( t->tor->errorString ) );
1157                tr_torrentStop( t->tor );
1158            }
1159            break;
1160
1161        default:
1162            assert( 0 );
1163    }
1164
1165    torrentUnlock( t );
1166}
1167
1168static void
1169ensureAtomExists( Torrent          * t,
1170                  const tr_address * addr,
1171                  tr_port            port,
1172                  uint8_t            flags,
1173                  uint8_t            from )
1174{
1175    if( getExistingAtom( t, addr ) == NULL )
1176    {
1177        struct peer_atom * a;
1178        a = tr_new0( struct peer_atom, 1 );
1179        a->addr = *addr;
1180        a->port = port;
1181        a->flags = flags;
1182        a->from = from;
1183        tordbg( t, "got a new atom: %s",
1184               tr_peerIoAddrStr( &a->addr, a->port ) );
1185        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1186    }
1187}
1188
1189static int
1190getMaxPeerCount( const tr_torrent * tor )
1191{
1192    return tor->maxConnectedPeers;
1193}
1194
1195static int
1196getPeerCount( const Torrent * t )
1197{
1198    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize(
1199               t->outgoingHandshakes );
1200}
1201
1202/* FIXME: this is kind of a mess. */
1203static int
1204myHandshakeDoneCB( tr_handshake *  handshake,
1205                   tr_peerIo *     io,
1206                   int             isConnected,
1207                   const uint8_t * peer_id,
1208                   void *          vmanager )
1209{
1210    int                ok = isConnected;
1211    int                success = FALSE;
1212    tr_port            port;
1213    const tr_address * addr;
1214    tr_peerMgr       * manager = vmanager;
1215    Torrent          * t;
1216    tr_handshake     * ours;
1217
1218    assert( io );
1219    assert( isConnected == 0 || isConnected == 1 );
1220
1221    t = tr_peerIoHasTorrentHash( io )
1222        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1223        : NULL;
1224
1225    if( tr_peerIoIsIncoming ( io ) )
1226        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1227                                        handshake, handshakeCompare );
1228    else if( t )
1229        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1230                                        handshake, handshakeCompare );
1231    else
1232        ours = handshake;
1233
1234    assert( ours );
1235    assert( ours == handshake );
1236
1237    if( t )
1238        torrentLock( t );
1239
1240    addr = tr_peerIoGetAddress( io, &port );
1241
1242    if( !ok || !t || !t->isRunning )
1243    {
1244        if( t )
1245        {
1246            struct peer_atom * atom = getExistingAtom( t, addr );
1247            if( atom )
1248                ++atom->numFails;
1249        }
1250
1251        tr_peerIoFree( io );
1252    }
1253    else /* looking good */
1254    {
1255        struct peer_atom * atom;
1256        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1257        atom = getExistingAtom( t, addr );
1258        atom->time = time( NULL );
1259
1260        if( atom->myflags & MYFLAG_BANNED )
1261        {
1262            tordbg( t, "banned peer %s tried to reconnect",
1263                   tr_peerIoAddrStr( &atom->addr,
1264                                     atom->port ) );
1265            tr_peerIoFree( io );
1266        }
1267        else if( tr_peerIoIsIncoming( io )
1268               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1269
1270        {
1271            tr_peerIoFree( io );
1272        }
1273        else
1274        {
1275            tr_peer * peer = getExistingPeer( t, addr );
1276
1277            if( peer ) /* we already have this peer */
1278            {
1279                tr_peerIoFree( io );
1280            }
1281            else
1282            {
1283                peer = getPeer( t, addr );
1284                tr_free( peer->client );
1285
1286                if( !peer_id )
1287                    peer->client = NULL;
1288                else {
1289                    char client[128];
1290                    tr_clientForId( client, sizeof( client ), peer_id );
1291                    peer->client = tr_strdup( client );
1292                }
1293
1294                peer->port = port;
1295                peer->io = io;
1296                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1297                tr_peerIoSetBandwidth( io, peer->bandwidth );
1298
1299                success = TRUE;
1300            }
1301        }
1302    }
1303
1304    if( t )
1305        torrentUnlock( t );
1306
1307    return success;
1308}
1309
1310void
1311tr_peerMgrAddIncoming( tr_peerMgr * manager,
1312                       tr_address * addr,
1313                       tr_port      port,
1314                       int          socket )
1315{
1316    managerLock( manager );
1317
1318    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1319    {
1320        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1321               tr_ntop_non_ts( addr ) );
1322        tr_netClose( socket );
1323    }
1324    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1325    {
1326        tr_netClose( socket );
1327    }
1328    else /* we don't have a connetion to them yet... */
1329    {
1330        tr_peerIo *    io;
1331        tr_handshake * handshake;
1332
1333        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1334
1335        handshake = tr_handshakeNew( io,
1336                                     manager->session->encryptionMode,
1337                                     myHandshakeDoneCB,
1338                                     manager );
1339
1340        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1341                                 handshakeCompare );
1342    }
1343
1344    managerUnlock( manager );
1345}
1346
1347void
1348tr_peerMgrAddPex( tr_peerMgr *    manager,
1349                  const uint8_t * torrentHash,
1350                  uint8_t         from,
1351                  const tr_pex *  pex )
1352{
1353    Torrent * t;
1354
1355    managerLock( manager );
1356
1357    t = getExistingTorrent( manager, torrentHash );
1358    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1359        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1360
1361    managerUnlock( manager );
1362}
1363
1364tr_pex *
1365tr_peerMgrCompactToPex( const void *    compact,
1366                        size_t          compactLen,
1367                        const uint8_t * added_f,
1368                        size_t          added_f_len,
1369                        size_t *        pexCount )
1370{
1371    size_t          i;
1372    size_t          n = compactLen / 6;
1373    const uint8_t * walk = compact;
1374    tr_pex *        pex = tr_new0( tr_pex, n );
1375
1376    for( i = 0; i < n; ++i )
1377    {
1378        pex[i].addr.type = TR_AF_INET;
1379        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1380        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1381        if( added_f && ( n == added_f_len ) )
1382            pex[i].flags = added_f[i];
1383    }
1384
1385    *pexCount = n;
1386    return pex;
1387}
1388
1389/**
1390***
1391**/
1392
1393void
1394tr_peerMgrSetBlame( tr_peerMgr *     manager,
1395                    const uint8_t *  torrentHash,
1396                    tr_piece_index_t pieceIndex,
1397                    int              success )
1398{
1399    if( !success )
1400    {
1401        int        peerCount, i;
1402        Torrent *  t = getExistingTorrent( manager, torrentHash );
1403        tr_peer ** peers;
1404
1405        assert( torrentIsLocked( t ) );
1406
1407        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1408        for( i = 0; i < peerCount; ++i )
1409        {
1410            tr_peer * peer = peers[i];
1411            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1412            {
1413                tordbg(
1414                    t,
1415                    "peer %s contributed to corrupt piece (%d); now has %d strikes",
1416                    tr_peerIoAddrStr( &peer->addr, peer->port ),
1417                    pieceIndex, (int)peer->strikes + 1 );
1418                addStrike( t, peer );
1419            }
1420        }
1421    }
1422}
1423
1424int
1425tr_pexCompare( const void * va,
1426               const void * vb )
1427{
1428    const tr_pex * a = va;
1429    const tr_pex * b = vb;
1430    int            i = tr_compareAddresses( &a->addr, &b->addr );
1431
1432    if( i ) return i;
1433    if( a->port < b->port ) return -1;
1434    if( a->port > b->port ) return 1;
1435    return 0;
1436}
1437
1438int tr_pexCompare( const void * a,
1439                   const void * b );
1440
1441static int
1442peerPrefersCrypto( const tr_peer * peer )
1443{
1444    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1445        return TRUE;
1446
1447    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1448        return FALSE;
1449
1450    return tr_peerIoIsEncrypted( peer->io );
1451}
1452
1453int
1454tr_peerMgrGetPeers( tr_peerMgr *    manager,
1455                    const uint8_t * torrentHash,
1456                    tr_pex **       setme_pex )
1457{
1458    int peerCount = 0;
1459    const Torrent *  t;
1460
1461    managerLock( manager );
1462
1463    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1464    if( !t )
1465    {
1466        *setme_pex = NULL;
1467    }
1468    else
1469    {
1470        int i;
1471        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1472        tr_pex * pex = tr_new( tr_pex, peerCount );
1473        tr_pex * walk = pex;
1474
1475        for( i = 0; i < peerCount; ++i, ++walk )
1476        {
1477            const tr_peer * peer = peers[i];
1478            const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1479
1480            walk->addr = peer->addr;
1481            walk->port = peer->port;
1482            walk->flags = 0;
1483            if( peerPrefersCrypto( peer ) )
1484                walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1485            if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1486                walk->flags |= ADDED_F_SEED_FLAG;
1487        }
1488
1489        assert( ( walk - pex ) == peerCount );
1490        qsort( pex, peerCount, sizeof( tr_pex ), tr_pexCompare );
1491        *setme_pex = pex;
1492    }
1493
1494    managerUnlock( manager );
1495    return peerCount;
1496}
1497
1498static int reconnectPulse( void * vtorrent );
1499
1500static int rechokePulse( void * vtorrent );
1501
1502void
1503tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1504                        const uint8_t * torrentHash )
1505{
1506    Torrent * t;
1507
1508    managerLock( manager );
1509
1510    t = getExistingTorrent( manager, torrentHash );
1511
1512    assert( t );
1513    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1514    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1515
1516    if( !t->isRunning )
1517    {
1518        t->isRunning = 1;
1519
1520        t->reconnectTimer = tr_timerNew( t->manager->session,
1521                                         reconnectPulse, t,
1522                                         RECONNECT_PERIOD_MSEC );
1523
1524        t->rechokeTimer = tr_timerNew( t->manager->session,
1525                                       rechokePulse, t,
1526                                       RECHOKE_PERIOD_MSEC );
1527
1528        reconnectPulse( t );
1529
1530        rechokePulse( t );
1531
1532        if( !tr_ptrArrayEmpty( t->webseeds ) )
1533            refillSoon( t );
1534    }
1535
1536    managerUnlock( manager );
1537}
1538
1539static void
1540stopTorrent( Torrent * t )
1541{
1542    assert( torrentIsLocked( t ) );
1543
1544    t->isRunning = 0;
1545    tr_timerFree( &t->rechokeTimer );
1546    tr_timerFree( &t->reconnectTimer );
1547
1548    /* disconnect the peers. */
1549    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1550    tr_ptrArrayClear( t->peers );
1551
1552    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1553     * which removes the handshake from t->outgoingHandshakes... */
1554    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1555        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1556}
1557
1558void
1559tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1560                       const uint8_t * torrentHash )
1561{
1562    managerLock( manager );
1563
1564    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1565
1566    managerUnlock( manager );
1567}
1568
1569void
1570tr_peerMgrAddTorrent( tr_peerMgr * manager,
1571                      tr_torrent * tor )
1572{
1573    Torrent * t;
1574
1575    managerLock( manager );
1576
1577    assert( tor );
1578    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1579
1580    t = torrentConstructor( manager, tor );
1581    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1582
1583    managerUnlock( manager );
1584}
1585
1586void
1587tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1588                         const uint8_t * torrentHash )
1589{
1590    Torrent * t;
1591
1592    managerLock( manager );
1593
1594    t = getExistingTorrent( manager, torrentHash );
1595    assert( t );
1596    stopTorrent( t );
1597    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1598    torrentDestructor( t );
1599
1600    managerUnlock( manager );
1601}
1602
1603void
1604tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1605                               const uint8_t *    torrentHash,
1606                               int8_t *           tab,
1607                               unsigned int       tabCount )
1608{
1609    tr_piece_index_t   i;
1610    const Torrent *    t;
1611    const tr_torrent * tor;
1612    float              interval;
1613    int                isSeed;
1614    int                peerCount;
1615    const tr_peer **   peers;
1616
1617    managerLock( manager );
1618
1619    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1620    tor = t->tor;
1621    interval = tor->info.pieceCount / (float)tabCount;
1622    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1623    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1624
1625    memset( tab, 0, tabCount );
1626
1627    for( i = 0; tor && i < tabCount; ++i )
1628    {
1629        const int piece = i * interval;
1630
1631        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1632            tab[i] = -1;
1633        else if( peerCount )
1634        {
1635            int j;
1636            for( j = 0; j < peerCount; ++j )
1637                if( tr_bitfieldHas( peers[j]->have, i ) )
1638                    ++tab[i];
1639        }
1640    }
1641
1642    managerUnlock( manager );
1643}
1644
1645/* Returns the pieces that are available from peers */
1646tr_bitfield*
1647tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1648                        const uint8_t *    torrentHash )
1649{
1650    int           i, size;
1651    Torrent *     t;
1652    tr_peer **    peers;
1653    tr_bitfield * pieces;
1654
1655    managerLock( manager );
1656
1657    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1658    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1659    peers = getConnectedPeers( t, &size );
1660    for( i = 0; i < size; ++i )
1661        tr_bitfieldOr( pieces, peers[i]->have );
1662
1663    managerUnlock( manager );
1664    tr_free( peers );
1665    return pieces;
1666}
1667
1668int
1669tr_peerMgrHasConnections( const tr_peerMgr * manager,
1670                          const uint8_t *    torrentHash )
1671{
1672    int             ret;
1673    const Torrent * t;
1674
1675    managerLock( manager );
1676
1677    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1678    ret = t && ( !tr_ptrArrayEmpty( t->peers )
1679               || !tr_ptrArrayEmpty( t->webseeds ) );
1680
1681    managerUnlock( manager );
1682    return ret;
1683}
1684
1685void
1686tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1687                        const uint8_t *    torrentHash,
1688                        int *              setmePeersKnown,
1689                        int *              setmePeersConnected,
1690                        int *              setmeSeedsConnected,
1691                        int *              setmeWebseedsSendingToUs,
1692                        int *              setmePeersSendingToUs,
1693                        int *              setmePeersGettingFromUs,
1694                        int *              setmePeersFrom )
1695{
1696    int                 i, size;
1697    const Torrent *     t;
1698    const tr_peer **    peers;
1699    const tr_webseed ** webseeds;
1700
1701    managerLock( manager );
1702
1703    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1704    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1705
1706    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1707    *setmePeersConnected       = 0;
1708    *setmeSeedsConnected       = 0;
1709    *setmePeersGettingFromUs   = 0;
1710    *setmePeersSendingToUs     = 0;
1711    *setmeWebseedsSendingToUs  = 0;
1712
1713    for( i = 0; i < TR_PEER_FROM__MAX; ++i )
1714        setmePeersFrom[i] = 0;
1715
1716    for( i = 0; i < size; ++i )
1717    {
1718        const tr_peer *          peer = peers[i];
1719        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1720
1721        if( peer->io == NULL ) /* not connected */
1722            continue;
1723
1724        ++ * setmePeersConnected;
1725
1726        ++setmePeersFrom[atom->from];
1727
1728        if( clientIsDownloadingFrom( peer ) )
1729            ++ * setmePeersSendingToUs;
1730
1731        if( clientIsUploadingTo( peer ) )
1732            ++ * setmePeersGettingFromUs;
1733
1734        if( atom->flags & ADDED_F_SEED_FLAG )
1735            ++ * setmeSeedsConnected;
1736    }
1737
1738    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1739    for( i = 0; i < size; ++i )
1740    {
1741        if( tr_webseedIsActive( webseeds[i] ) )
1742            ++ * setmeWebseedsSendingToUs;
1743    }
1744
1745    managerUnlock( manager );
1746}
1747
1748float*
1749tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1750                     const uint8_t *    torrentHash )
1751{
1752    const Torrent *     t;
1753    const tr_webseed ** webseeds;
1754    int                 i;
1755    int                 webseedCount;
1756    float *             ret;
1757
1758    assert( manager );
1759    managerLock( manager );
1760
1761    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1762    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1763                                                     &webseedCount );
1764    assert( webseedCount == t->tor->info.webseedCount );
1765    ret = tr_new0( float, webseedCount );
1766
1767    for( i = 0; i < webseedCount; ++i )
1768        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1769            ret[i] = -1.0;
1770
1771    managerUnlock( manager );
1772    return ret;
1773}
1774
1775double
1776tr_peerGetPieceSpeed( const tr_peer    * peer,
1777                      tr_direction       direction )
1778{
1779    assert( peer );
1780    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1781
1782    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1783}
1784
1785
1786struct tr_peer_stat *
1787tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1788                     const   uint8_t     * torrentHash,
1789                     int                 * setmeCount UNUSED )
1790{
1791    int             i, size;
1792    const Torrent * t;
1793    tr_peer **      peers;
1794    tr_peer_stat *  ret;
1795
1796    assert( manager );
1797    managerLock( manager );
1798
1799    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1800    peers = getConnectedPeers( (Torrent*)t, &size );
1801    ret = tr_new0( tr_peer_stat, size );
1802
1803    for( i = 0; i < size; ++i )
1804    {
1805        char *                   pch;
1806        const tr_peer *          peer = peers[i];
1807        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1808        tr_peer_stat *           stat = ret + i;
1809
1810        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
1811        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1812                   sizeof( stat->client ) );
1813        stat->port               = ntohs( peer->port );
1814        stat->from               = atom->from;
1815        stat->progress           = peer->progress;
1816        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1817        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1818        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1819        stat->peerIsChoked       = peer->peerIsChoked;
1820        stat->peerIsInterested   = peer->peerIsInterested;
1821        stat->clientIsChoked     = peer->clientIsChoked;
1822        stat->clientIsInterested = peer->clientIsInterested;
1823        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1824        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1825        stat->isUploadingTo      = clientIsUploadingTo( peer );
1826        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1827
1828        pch = stat->flagStr;
1829        if( t->optimistic == peer ) *pch++ = 'O';
1830        if( stat->isDownloadingFrom ) *pch++ = 'D';
1831        else if( stat->clientIsInterested ) *pch++ = 'd';
1832        if( stat->isUploadingTo ) *pch++ = 'U';
1833        else if( stat->peerIsInterested ) *pch++ = 'u';
1834        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ =
1835                'K';
1836        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1837        if( stat->isEncrypted ) *pch++ = 'E';
1838        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1839        if( stat->isIncoming ) *pch++ = 'I';
1840        *pch = '\0';
1841    }
1842
1843    *setmeCount = size;
1844    tr_free( peers );
1845
1846    managerUnlock( manager );
1847    return ret;
1848}
1849
1850/**
1851***
1852**/
1853
1854struct ChokeData
1855{
1856    tr_bool         doUnchoke;
1857    tr_bool         isInterested;
1858    tr_bool         isChoked;
1859    int             rate;
1860    tr_peer *       peer;
1861};
1862
1863static int
1864compareChoke( const void * va,
1865              const void * vb )
1866{
1867    const struct ChokeData * a = va;
1868    const struct ChokeData * b = vb;
1869
1870    if( a->rate != b->rate ) /* prefer higher overall speeds */
1871        return a->rate > b->rate ? -1 : 1;
1872
1873    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1874        return a->isChoked ? 1 : -1;
1875
1876    return 0;
1877}
1878
1879static int
1880isNew( const tr_peer * peer )
1881{
1882    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1883}
1884
1885static int
1886isSame( const tr_peer * peer )
1887{
1888    return peer && peer->client && strstr( peer->client, "Transmission" );
1889}
1890
1891/**
1892***
1893**/
1894
1895static void
1896rechoke( Torrent * t )
1897{
1898    int                i, peerCount, size, unchokedInterested;
1899    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1900    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1901    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1902
1903    assert( torrentIsLocked( t ) );
1904
1905    /* sort the peers by preference and rate */
1906    for( i = 0, size = 0; i < peerCount; ++i )
1907    {
1908        tr_peer * peer = peers[i];
1909        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1910
1911        if( peer->progress >= 1.0 ) /* choke all seeds */
1912        {
1913            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1914        }
1915        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1916        {
1917            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1918        }
1919        else if( chokeAll ) /* choke everyone if we're not uploading */
1920        {
1921            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1922        }
1923        else
1924        {
1925            struct ChokeData * n = &choke[size++];
1926            n->peer         = peer;
1927            n->isInterested = peer->peerIsInterested;
1928            n->isChoked     = peer->peerIsChoked;
1929            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1930        }
1931    }
1932
1933    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1934
1935    /**
1936     * Reciprocation and number of uploads capping is managed by unchoking
1937     * the N peers which have the best upload rate and are interested.
1938     * This maximizes the client's download rate. These N peers are
1939     * referred to as downloaders, because they are interested in downloading
1940     * from the client.
1941     *
1942     * Peers which have a better upload rate (as compared to the downloaders)
1943     * but aren't interested get unchoked. If they become interested, the
1944     * downloader with the worst upload rate gets choked. If a client has
1945     * a complete file, it uses its upload rate rather than its download
1946     * rate to decide which peers to unchoke.
1947     */
1948    unchokedInterested = 0;
1949    for( i = 0; i < size && unchokedInterested < MAX_UNCHOKED_PEERS; ++i )
1950    {
1951        choke[i].doUnchoke = 1;
1952        if( choke[i].isInterested )
1953            ++unchokedInterested;
1954    }
1955
1956    /* optimistic unchoke */
1957    if( i < size )
1958    {
1959        int                n;
1960        struct ChokeData * c;
1961        tr_ptrArray *      randPool = tr_ptrArrayNew( );
1962
1963        for( ; i < size; ++i )
1964        {
1965            if( choke[i].isInterested )
1966            {
1967                const tr_peer * peer = choke[i].peer;
1968                int             x = 1, y;
1969                if( isNew( peer ) ) x *= 3;
1970                if( isSame( peer ) ) x *= 3;
1971                for( y = 0; y < x; ++y )
1972                    tr_ptrArrayAppend( randPool, &choke[i] );
1973            }
1974        }
1975
1976        if( ( n = tr_ptrArraySize( randPool ) ) )
1977        {
1978            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ) );
1979            c->doUnchoke = 1;
1980            t->optimistic = c->peer;
1981        }
1982
1983        tr_ptrArrayFree( randPool, NULL );
1984    }
1985
1986    for( i = 0; i < size; ++i )
1987        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1988
1989    /* cleanup */
1990    tr_free( choke );
1991    tr_free( peers );
1992}
1993
1994static int
1995rechokePulse( void * vtorrent )
1996{
1997    Torrent * t = vtorrent;
1998
1999    torrentLock( t );
2000    rechoke( t );
2001    torrentUnlock( t );
2002    return TRUE;
2003}
2004
2005/***
2006****
2007****  Life and Death
2008****
2009***/
2010
2011static int
2012shouldPeerBeClosed( const Torrent * t,
2013                    const tr_peer * peer,
2014                    int             peerCount )
2015{
2016    const tr_torrent *       tor = t->tor;
2017    const time_t             now = time( NULL );
2018    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2019
2020    /* if it's marked for purging, close it */
2021    if( peer->doPurge )
2022    {
2023        tordbg( t, "purging peer %s because its doPurge flag is set",
2024               tr_peerIoAddrStr( &atom->addr,
2025                                 atom->port ) );
2026        return TRUE;
2027    }
2028
2029    /* if we're seeding and the peer has everything we have,
2030     * and enough time has passed for a pex exchange, then disconnect */
2031    if( tr_torrentIsSeed( tor ) )
2032    {
2033        int peerHasEverything;
2034        if( atom->flags & ADDED_F_SEED_FLAG )
2035            peerHasEverything = TRUE;
2036        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2037            peerHasEverything = FALSE;
2038        else
2039        {
2040            tr_bitfield * tmp =
2041                tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2042            tr_bitfieldDifference( tmp, peer->have );
2043            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2044            tr_bitfieldFree( tmp );
2045        }
2046        if( peerHasEverything
2047          && ( !tr_torrentAllowsPex( tor ) || ( now - atom->time >= 30 ) ) )
2048        {
2049            tordbg( t, "purging peer %s because we're both seeds",
2050                   tr_peerIoAddrStr( &atom->addr,
2051                                     atom->port ) );
2052            return TRUE;
2053        }
2054    }
2055
2056    /* disconnect if it's been too long since piece data has been transferred.
2057     * this is on a sliding scale based on number of available peers... */
2058    {
2059        const int    relaxStrictnessIfFewerThanN =
2060            (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2061        /* if we have >= relaxIfFewerThan, strictness is 100%.
2062         * if we have zero connections, strictness is 0% */
2063        const float  strictness = peerCount >= relaxStrictnessIfFewerThanN
2064                                  ? 1.0
2065                                  : peerCount /
2066                                  (float)relaxStrictnessIfFewerThanN;
2067        const int    lo = MIN_UPLOAD_IDLE_SECS;
2068        const int    hi = MAX_UPLOAD_IDLE_SECS;
2069        const int    limit = lo + ( ( hi - lo ) * strictness );
2070        const time_t then = peer->pieceDataActivityDate;
2071        const int    idleTime = then ? ( now - then ) : 0;
2072        if( idleTime > limit )
2073        {
2074            tordbg(
2075                t,
2076                "purging peer %s because it's been %d secs since we shared anything",
2077                tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2078            return TRUE;
2079        }
2080    }
2081
2082    return FALSE;
2083}
2084
2085static tr_peer **
2086getPeersToClose( Torrent * t,
2087                 int *     setmeSize )
2088{
2089    int               i, peerCount, outsize;
2090    tr_peer **        peers = (tr_peer**) tr_ptrArrayPeek( t->peers,
2091                                                           &peerCount );
2092    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2093
2094    assert( torrentIsLocked( t ) );
2095
2096    for( i = outsize = 0; i < peerCount; ++i )
2097        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2098            ret[outsize++] = peers[i];
2099
2100    *setmeSize = outsize;
2101    return ret;
2102}
2103
2104static int
2105compareCandidates( const void * va,
2106                   const void * vb )
2107{
2108    const struct peer_atom * a = *(const struct peer_atom**) va;
2109    const struct peer_atom * b = *(const struct peer_atom**) vb;
2110
2111    /* <Charles> Here we would probably want to try reconnecting to
2112     * peers that had most recently given us data. Lots of users have
2113     * trouble with resets due to their routers and/or ISPs. This way we
2114     * can quickly recover from an unwanted reset. So we sort
2115     * piece_data_time in descending order.
2116     */
2117
2118    if( a->piece_data_time != b->piece_data_time )
2119        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2120
2121    if( a->numFails != b->numFails )
2122        return a->numFails < b->numFails ? -1 : 1;
2123
2124    if( a->time != b->time )
2125        return a->time < b->time ? -1 : 1;
2126
2127    return 0;
2128}
2129
2130static int
2131getReconnectIntervalSecs( const struct peer_atom * atom )
2132{
2133    int          sec;
2134    const time_t now = time( NULL );
2135
2136    /* if we were recently connected to this peer and transferring piece
2137     * data, try to reconnect to them sooner rather that later -- we don't
2138     * want network troubles to get in the way of a good peer. */
2139    if( ( now - atom->piece_data_time ) <=
2140       ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2141        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2142
2143    /* don't allow reconnects more often than our minimum */
2144    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2145        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2146
2147    /* otherwise, the interval depends on how many times we've tried
2148     * and failed to connect to the peer */
2149    else switch( atom->numFails )
2150        {
2151            case 0:
2152                sec = 0; break;
2153
2154            case 1:
2155                sec = 5; break;
2156
2157            case 2:
2158                sec = 2 * 60; break;
2159
2160            case 3:
2161                sec = 15 * 60; break;
2162
2163            case 4:
2164                sec = 30 * 60; break;
2165
2166            case 5:
2167                sec = 60 * 60; break;
2168
2169            default:
2170                sec = 120 * 60; break;
2171        }
2172
2173    return sec;
2174}
2175
2176static struct peer_atom **
2177getPeerCandidates(                               Torrent * t,
2178                                           int * setmeSize )
2179{
2180    int                 i, atomCount, retCount;
2181    struct peer_atom ** atoms;
2182    struct peer_atom ** ret;
2183    const time_t        now = time( NULL );
2184    const int           seed = tr_torrentIsSeed( t->tor );
2185
2186    assert( torrentIsLocked( t ) );
2187
2188    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2189    ret = tr_new( struct peer_atom*, atomCount );
2190    for( i = retCount = 0; i < atomCount; ++i )
2191    {
2192        int                interval;
2193        struct peer_atom * atom = atoms[i];
2194
2195        /* peer fed us too much bad data ... we only keep it around
2196         * now to weed it out in case someone sends it to us via pex */
2197        if( atom->myflags & MYFLAG_BANNED )
2198            continue;
2199
2200        /* peer was unconnectable before, so we're not going to keep trying.
2201         * this is needs a separate flag from `banned', since if they try
2202         * to connect to us later, we'll let them in */
2203        if( atom->myflags & MYFLAG_UNREACHABLE )
2204            continue;
2205
2206        /* we don't need two connections to the same peer... */
2207        if( peerIsInUse( t, &atom->addr ) )
2208            continue;
2209
2210        /* no need to connect if we're both seeds... */
2211        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2212                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2213            continue;
2214
2215        /* don't reconnect too often */
2216        interval = getReconnectIntervalSecs( atom );
2217        if( ( now - atom->time ) < interval )
2218        {
2219            tordbg(
2220                t,
2221                "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2222                i, tr_peerIoAddrStr( &atom->addr,
2223                                     atom->port ), interval );
2224            continue;
2225        }
2226
2227        /* Don't connect to peers in our blocklist */
2228        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2229            continue;
2230
2231        ret[retCount++] = atom;
2232    }
2233
2234    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2235    *setmeSize = retCount;
2236    return ret;
2237}
2238
2239static int
2240reconnectPulse( void * vtorrent )
2241{
2242    Torrent *     t = vtorrent;
2243    static time_t prevTime = 0;
2244    static int    newConnectionsThisSecond = 0;
2245    time_t        now;
2246
2247    torrentLock( t );
2248
2249    now = time( NULL );
2250    if( prevTime != now )
2251    {
2252        prevTime = now;
2253        newConnectionsThisSecond = 0;
2254    }
2255
2256    if( !t->isRunning )
2257    {
2258        removeAllPeers( t );
2259    }
2260    else
2261    {
2262        int                 i, nCandidates, nBad;
2263        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2264        struct tr_peer **   connections = getPeersToClose( t, &nBad );
2265
2266        if( nBad || nCandidates )
2267            tordbg(
2268                t, "reconnect pulse for [%s]: %d bad connections, "
2269                   "%d connection candidates, %d atoms, max per pulse is %d",
2270                t->tor->info.name, nBad, nCandidates,
2271                tr_ptrArraySize( t->pool ),
2272                (int)MAX_RECONNECTIONS_PER_PULSE );
2273
2274        /* disconnect some peers.
2275           if we transferred piece data, then they might be good peers,
2276           so reset their `numFails' weight to zero.  otherwise we connected
2277           to them fruitlessly, so mark it as another fail */
2278        for( i = 0; i < nBad; ++i )
2279        {
2280            tr_peer *          peer = connections[i];
2281            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2282            if( peer->pieceDataActivityDate )
2283                atom->numFails = 0;
2284            else
2285                ++atom->numFails;
2286            tordbg( t, "removing bad peer %s",
2287                   tr_peerIoGetAddrStr( peer->io ) );
2288            removePeer( t, peer );
2289        }
2290
2291        /* add some new ones */
2292        for( i = 0;    ( i < nCandidates )
2293           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2294           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2295           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND );
2296             ++i )
2297        {
2298            tr_peerMgr *       mgr = t->manager;
2299            struct peer_atom * atom = candidates[i];
2300            tr_peerIo *        io;
2301
2302            tordbg( t, "Starting an OUTGOING connection with %s",
2303                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2304
2305            io =
2306                tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port,
2307                                      t->hash );
2308            if( io == NULL )
2309            {
2310                atom->myflags |= MYFLAG_UNREACHABLE;
2311            }
2312            else
2313            {
2314                tr_handshake * handshake = tr_handshakeNew(
2315                    io,
2316                    mgr->session->
2317                    encryptionMode,
2318                    myHandshakeDoneCB,
2319                    mgr );
2320
2321                assert( tr_peerIoGetTorrentHash( io ) );
2322
2323                ++newConnectionsThisSecond;
2324
2325                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2326                                         handshakeCompare );
2327            }
2328
2329            atom->time = time( NULL );
2330        }
2331
2332        /* cleanup */
2333        tr_free( connections );
2334        tr_free( candidates );
2335    }
2336
2337    torrentUnlock( t );
2338    return TRUE;
2339}
2340
2341/****
2342*****
2343*****  BANDWIDTH ALLOCATION
2344*****
2345****/
2346
2347static void
2348pumpAllPeers( tr_peerMgr * mgr )
2349{
2350    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2351    int       i, j;
2352
2353    for( i=0; i<torrentCount; ++i )
2354    {
2355        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2356        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2357        {
2358            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2359            tr_peerMsgsPulse( peer->msgs );
2360        }
2361    }
2362}
2363
2364static int
2365bandwidthPulse( void * vmgr )
2366{
2367    tr_peerMgr * mgr = vmgr;
2368    managerLock( mgr );
2369
2370    pumpAllPeers( mgr );
2371    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2372    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2373    pumpAllPeers( mgr );
2374
2375    managerUnlock( mgr );
2376    return TRUE;
2377}
Note: See TracBrowser for help on using the repository browser.