source: trunk/libtransmission/peer-mgr.c @ 7423

Last change on this file since 7423 was 7423, checked in by charles, 12 years ago

(trunk libT) more runtime safety checks

  • Property svn:keywords set to Date Rev Author Id
File size: 67.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7423 2008-12-16 23:31:05Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "bandwidth.h"
23#include "bencode.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "inout.h" /* tr_ioTestPiece */
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
46
47    /* minimum interval for refilling peers' request lists */
48    REFILL_PERIOD_MSEC = 333,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = ( 30 ),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
58   
59    /* how frequently to reallocate bandwidth */
60    BANDWIDTH_PERIOD_MSEC = 200,
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 4,
64
65    /* max number of peers to ask for per second overall.
66    * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent.
70     * FIXME: this probably ought to be configurable */
71    MAX_UNCHOKED_PEERS = 14,
72
73    /* number of bad pieces a peer is allowed to send before we ban them */
74    MAX_BAD_PIECES_PER_PEER = 5,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* unreachable for now... but not banned.
80     * if they try to connect to us it's okay */
81    MYFLAG_UNREACHABLE = 2,
82
83    /* the minimum we'll wait before attempting to reconnect to a peer */
84    MINIMUM_RECONNECT_INTERVAL_SECS = 5
85};
86
87
88/**
89***
90**/
91
92enum
93{
94    UPLOAD_ONLY_UKNOWN,
95    UPLOAD_ONLY_YES,
96    UPLOAD_ONLY_NO
97};
98
99/**
100 * Peer information that should be kept even before we've connected and
101 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
102 * which ones would make good candidates for connecting to, and to watch out
103 * for banned peers.
104 *
105 * @see tr_peer
106 * @see tr_peermsgs
107 */
108struct peer_atom
109{
110    uint8_t     from;
111    uint8_t     flags;       /* these match the added_f flags */
112    uint8_t     myflags;     /* flags that aren't defined in added_f */
113    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
114    tr_port     port;
115    uint16_t    numFails;
116    tr_address  addr;
117    time_t      time;        /* when the peer's connection status last changed */
118    time_t      piece_data_time;
119};
120
121typedef struct
122{
123    tr_bool         isRunning;
124
125    uint8_t         hash[SHA_DIGEST_LENGTH];
126    int         *   pendingRequestCount;
127    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
128    tr_ptrArray *   pool; /* struct peer_atom */
129    tr_ptrArray *   peers; /* tr_peer */
130    tr_ptrArray *   webseeds; /* tr_webseed */
131    tr_timer *      reconnectTimer;
132    tr_timer *      rechokeTimer;
133    tr_timer *      refillTimer;
134    tr_torrent *    tor;
135    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
136
137    struct tr_peerMgr * manager;
138}
139Torrent;
140
141struct tr_peerMgr
142{
143    tr_session      * session;
144    tr_ptrArray     * torrents; /* Torrent */
145    tr_ptrArray     * incomingHandshakes; /* tr_handshake */
146    tr_timer        * bandwidthTimer;
147};
148
149#define tordbg( t, ... ) \
150    do { \
151        if( tr_deepLoggingIsActive( ) ) \
152            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
153    } while( 0 )
154
155#define dbgmsg( ... ) \
156    do { \
157        if( tr_deepLoggingIsActive( ) ) \
158            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
159    } while( 0 )
160
161/**
162***
163**/
164
165static void
166managerLock( const struct tr_peerMgr * manager )
167{
168    tr_globalLock( manager->session );
169}
170
171static void
172managerUnlock( const struct tr_peerMgr * manager )
173{
174    tr_globalUnlock( manager->session );
175}
176
177static void
178torrentLock( Torrent * torrent )
179{
180    managerLock( torrent->manager );
181}
182
183static void
184torrentUnlock( Torrent * torrent )
185{
186    managerUnlock( torrent->manager );
187}
188
189static int
190torrentIsLocked( const Torrent * t )
191{
192    return tr_globalIsLocked( t->manager->session );
193}
194
195/**
196***
197**/
198
199static int
200handshakeCompareToAddr( const void * va,
201                        const void * vb )
202{
203    const tr_handshake * a = va;
204
205    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
206}
207
208static int
209handshakeCompare( const void * a,
210                  const void * b )
211{
212    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
213}
214
215static tr_handshake*
216getExistingHandshake( tr_ptrArray      * handshakes,
217                      const tr_address * addr )
218{
219    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
220}
221
222static int
223comparePeerAtomToAddress( const void * va, const void * vb )
224{
225    const struct peer_atom * a = va;
226
227    return tr_compareAddresses( &a->addr, vb );
228}
229
230static int
231comparePeerAtoms( const void * va, const void * vb )
232{
233    const struct peer_atom * b = vb;
234
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va,
244                const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248
249    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
250}
251
252static int
253torrentCompareToHash( const void * va,
254                      const void * vb )
255{
256    const Torrent * a = va;
257    const uint8_t * b_hash = vb;
258
259    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
260}
261
262static Torrent*
263getExistingTorrent( tr_peerMgr *    manager,
264                    const uint8_t * hash )
265{
266    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
267                                             hash,
268                                             torrentCompareToHash );
269}
270
271static int
272peerCompare( const void * va, const void * vb )
273{
274    const tr_peer * a = va;
275    const tr_peer * b = vb;
276
277    return tr_compareAddresses( &a->addr, &b->addr );
278}
279
280static int
281peerCompareToAddr( const void * va, const void * vb )
282{
283    const tr_peer * a = va;
284
285    return tr_compareAddresses( &a->addr, vb );
286}
287
288static tr_peer*
289getExistingPeer( Torrent          * torrent,
290                 const tr_address * addr )
291{
292    assert( torrentIsLocked( torrent ) );
293    assert( addr );
294
295    return tr_ptrArrayFindSorted( torrent->peers, addr, peerCompareToAddr );
296}
297
298static struct peer_atom*
299getExistingAtom( const Torrent    * t,
300                 const tr_address * addr )
301{
302    assert( torrentIsLocked( t ) );
303    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
304}
305
306static tr_bool
307peerIsInUse( const Torrent    * ct,
308             const tr_address * addr )
309{
310    Torrent * t = (Torrent*) ct;
311
312    assert( torrentIsLocked ( t ) );
313
314    return getExistingPeer( t, addr )
315        || getExistingHandshake( t->outgoingHandshakes, addr )
316        || getExistingHandshake( t->manager->incomingHandshakes, addr );
317}
318
319static tr_peer*
320peerConstructor( tr_torrent * tor, const tr_address * addr )
321{
322    tr_peer * p;
323    p = tr_new0( tr_peer, 1 );
324    p->addr = *addr;
325    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
326    return p;
327}
328
329static tr_peer*
330getPeer( Torrent          * torrent,
331         const tr_address * addr )
332{
333    tr_peer * peer;
334
335    assert( torrentIsLocked( torrent ) );
336
337    peer = getExistingPeer( torrent, addr );
338
339    if( peer == NULL )
340    {
341        peer = peerConstructor( torrent->tor, addr );
342        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
343    }
344
345    return peer;
346}
347
348static void
349peerDestructor( tr_peer * peer )
350{
351    assert( peer );
352
353    if( peer->msgs != NULL )
354    {
355        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
356        tr_peerMsgsFree( peer->msgs );
357    }
358
359    tr_peerIoFree( peer->io );
360
361    tr_bitfieldFree( peer->have );
362    tr_bitfieldFree( peer->blame );
363    tr_free( peer->client );
364
365    tr_bandwidthFree( peer->bandwidth );
366
367    tr_free( peer );
368}
369
370static void
371removePeer( Torrent * t,
372            tr_peer * peer )
373{
374    tr_peer *          removed;
375    struct peer_atom * atom;
376
377    assert( torrentIsLocked( t ) );
378
379    atom = getExistingAtom( t, &peer->addr );
380    assert( atom );
381    atom->time = time( NULL );
382
383    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
384    assert( removed == peer );
385    peerDestructor( removed );
386}
387
388static void
389removeAllPeers( Torrent * t )
390{
391    while( !tr_ptrArrayEmpty( t->peers ) )
392        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
393}
394
395static void
396torrentDestructor( void * vt )
397{
398    Torrent * t = vt;
399    uint8_t   hash[SHA_DIGEST_LENGTH];
400
401    assert( t );
402    assert( !t->isRunning );
403    assert( t->peers );
404    assert( torrentIsLocked( t ) );
405    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
406    assert( tr_ptrArrayEmpty( t->peers ) );
407
408    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
409
410    tr_timerFree( &t->reconnectTimer );
411    tr_timerFree( &t->rechokeTimer );
412    tr_timerFree( &t->refillTimer );
413
414    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
415    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
416    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
417    tr_ptrArrayFree( t->peers, NULL );
418
419    tr_free( t->pendingRequestCount );
420    tr_free( t );
421}
422
423static void peerCallbackFunc( void * vpeer,
424                              void * vevent,
425                              void * vt );
426
427static Torrent*
428torrentConstructor( tr_peerMgr * manager,
429                    tr_torrent * tor )
430{
431    int       i;
432    Torrent * t;
433
434    t = tr_new0( Torrent, 1 );
435    t->manager = manager;
436    t->tor = tor;
437    t->pool = tr_ptrArrayNew( );
438    t->peers = tr_ptrArrayNew( );
439    t->webseeds = tr_ptrArrayNew( );
440    t->outgoingHandshakes = tr_ptrArrayNew( );
441    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
442
443    for( i = 0; i < tor->info.webseedCount; ++i )
444    {
445        tr_webseed * w =
446            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc,
447                           t );
448        tr_ptrArrayAppend( t->webseeds, w );
449    }
450
451    return t;
452}
453
454
455static int bandwidthPulse( void * vmgr );
456
457
458tr_peerMgr*
459tr_peerMgrNew( tr_session * session )
460{
461    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
462
463    m->session = session;
464    m->torrents = tr_ptrArrayNew( );
465    m->incomingHandshakes = tr_ptrArrayNew( );
466    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
467    return m;
468}
469
470void
471tr_peerMgrFree( tr_peerMgr * manager )
472{
473    managerLock( manager );
474
475    tr_timerFree( &manager->bandwidthTimer );
476
477    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
478     * the item from manager->handshakes, so this is a little roundabout... */
479    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
480        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
481
482    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
483
484    /* free the torrents. */
485    tr_ptrArrayFree( manager->torrents, torrentDestructor );
486
487    managerUnlock( manager );
488    tr_free( manager );
489}
490
491static tr_peer**
492getConnectedPeers( Torrent * t, int * setmeCount )
493{
494    int i, peerCount, connectionCount;
495    tr_peer **peers;
496    tr_peer **ret;
497
498    assert( torrentIsLocked( t ) );
499
500    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
501    ret = tr_new( tr_peer *, peerCount );
502
503    for( i = connectionCount = 0; i < peerCount; ++i )
504        if( peers[i]->msgs )
505            ret[connectionCount++] = peers[i];
506
507    *setmeCount = connectionCount;
508    return ret;
509}
510
511static int
512clientIsDownloadingFrom( const tr_peer * peer )
513{
514    return peer->clientIsInterested && !peer->clientIsChoked;
515}
516
517static int
518clientIsUploadingTo( const tr_peer * peer )
519{
520    return peer->peerIsInterested && !peer->peerIsChoked;
521}
522
523/***
524****
525***/
526
527tr_bool
528tr_peerMgrPeerIsSeed( const tr_peerMgr   * mgr,
529                      const uint8_t      * torrentHash,
530                      const tr_address   * addr )
531{
532    tr_bool isSeed = FALSE;
533    const Torrent * t = NULL;
534    const struct peer_atom * atom = NULL;
535
536    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
537    if( t )
538        atom = getExistingAtom( t, addr );
539    if( atom )
540        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
541
542    return isSeed;
543}
544
545/****
546*****
547*****  REFILL
548*****
549****/
550
551static void
552assertValidPiece( Torrent * t, tr_piece_index_t piece )
553{
554    assert( t );
555    assert( t->tor );
556    assert( piece < t->tor->info.pieceCount );
557}
558
559static int
560getPieceRequests( Torrent * t, tr_piece_index_t piece )
561{
562    assertValidPiece( t, piece );
563
564    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
565}
566
567static void
568incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
569{
570    assertValidPiece( t, piece );
571
572    if( t->pendingRequestCount == NULL )
573        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
574    t->pendingRequestCount[piece]++;
575}
576
577static void
578decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
579{
580    assertValidPiece( t, piece );
581
582    if( t->pendingRequestCount )
583        t->pendingRequestCount[piece]--;
584}
585
586struct tr_refill_piece
587{
588    tr_priority_t    priority;
589    uint32_t         piece;
590    uint32_t         peerCount;
591    int              random;
592    int              pendingRequestCount;
593    int              missingBlockCount;
594};
595
596static int
597compareRefillPiece( const void * aIn, const void * bIn )
598{
599    const struct tr_refill_piece * a = aIn;
600    const struct tr_refill_piece * b = bIn;
601
602    /* if one piece has a higher priority, it goes first */
603    if( a->priority != b->priority )
604        return a->priority > b->priority ? -1 : 1;
605
606    /* have a per-priority endgame */
607    if( a->pendingRequestCount != b->pendingRequestCount )
608        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
609
610    /* fewer missing pieces goes first */
611    if( a->missingBlockCount != b->missingBlockCount )
612        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
613
614    /* otherwise if one has fewer peers, it goes first */
615    if( a->peerCount != b->peerCount )
616        return a->peerCount < b->peerCount ? -1 : 1;
617
618    /* otherwise go with our random seed */
619    if( a->random != b->random )
620        return a->random < b->random ? -1 : 1;
621
622    return 0;
623}
624
625static tr_piece_index_t *
626getPreferredPieces( Torrent * t, tr_piece_index_t  * pieceCount )
627{
628    const tr_torrent  * tor = t->tor;
629    const tr_info     * inf = &tor->info;
630    tr_piece_index_t    i;
631    tr_piece_index_t    poolSize = 0;
632    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
633    int                 peerCount;
634    tr_peer**           peers;
635
636    assert( torrentIsLocked( t ) );
637
638    peers = getConnectedPeers( t, &peerCount );
639
640    /* make a list of the pieces that we want but don't have */
641    for( i = 0; i < inf->pieceCount; ++i )
642        if( !tor->info.pieces[i].dnd
643                && !tr_cpPieceIsComplete( tor->completion, i ) )
644            pool[poolSize++] = i;
645
646    /* sort the pool by which to request next */
647    if( poolSize > 1 )
648    {
649        tr_piece_index_t j;
650        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
651
652        for( j = 0; j < poolSize; ++j )
653        {
654            int k;
655            const tr_piece_index_t piece = pool[j];
656            struct tr_refill_piece * setme = p + j;
657
658            setme->piece = piece;
659            setme->priority = inf->pieces[piece].priority;
660            setme->peerCount = 0;
661            setme->random = tr_cryptoWeakRandInt( INT_MAX );
662            setme->pendingRequestCount = getPieceRequests( t, piece );
663            setme->missingBlockCount
664                         = tr_cpMissingBlocksInPiece( tor->completion, piece );
665
666            for( k = 0; k < peerCount; ++k )
667            {
668                const tr_peer * peer = peers[k];
669                if( peer->peerIsInterested
670                        && !peer->clientIsChoked
671                        && tr_bitfieldHas( peer->have, piece ) )
672                    ++setme->peerCount;
673            }
674        }
675
676        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
677               compareRefillPiece );
678
679        for( j = 0; j < poolSize; ++j )
680            pool[j] = p[j].piece;
681
682        tr_free( p );
683    }
684
685    tr_free( peers );
686
687    *pieceCount = poolSize;
688    return pool;
689}
690
691struct tr_blockIterator
692{
693    Torrent * t;
694    tr_block_index_t blockIndex, blockCount, *blocks;
695    tr_piece_index_t pieceIndex, pieceCount, *pieces;
696};
697
698static struct tr_blockIterator*
699blockIteratorNew( Torrent * t )
700{
701    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
702    i->t = t;
703    i->pieces = getPreferredPieces( t, &i->pieceCount );
704    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
705    return i;
706}
707
708static int
709blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
710{
711    int found;
712    Torrent * t = i->t;
713    tr_torrent * tor = t->tor;
714
715    while( ( i->blockIndex == i->blockCount )
716        && ( i->pieceIndex < i->pieceCount ) )
717    {
718        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
719        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
720        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
721        tr_block_index_t block;
722
723        assert( index < tor->info.pieceCount );
724
725        i->blockCount = 0;
726        i->blockIndex = 0;
727        for( block=b; block!=e; ++block )
728            if( !tr_cpBlockIsComplete( tor->completion, block ) )
729                i->blocks[i->blockCount++] = block;
730    }
731
732    if(( found = ( i->blockIndex < i->blockCount )))
733        *setme = i->blocks[i->blockIndex++];
734
735    return found;
736}
737
738static void
739blockIteratorFree( struct tr_blockIterator * i )
740{
741    tr_free( i->blocks );
742    tr_free( i->pieces );
743    tr_free( i );
744}
745
746static tr_peer**
747getPeersUploadingToClient( Torrent * t,
748                           int *     setmeCount )
749{
750    int j;
751    int peerCount = 0;
752    int retCount = 0;
753    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
754    tr_peer ** ret = tr_new( tr_peer *, peerCount );
755
756    j = 0; /* this is a temporary test to make sure we walk through all the peers */
757    if( peerCount )
758    {
759        /* Get a list of peers we're downloading from.
760           Pick a different starting point each time so all peers
761           get a chance at being the first in line */
762        const int fencepost = tr_cryptoWeakRandInt( peerCount );
763        int i = fencepost;
764        do {
765            if( clientIsDownloadingFrom( peers[i] ) )
766                ret[retCount++] = peers[i];
767            i = ( i + 1 ) % peerCount;
768            ++j;
769        } while( i != fencepost );
770    }
771    assert( j == peerCount );
772    *setmeCount = retCount;
773    return ret;
774}
775
776static uint32_t
777getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
778{
779    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
780    const uint64_t blockPos = tor->blockSize * b;
781    assert( blockPos >= piecePos );
782    return (uint32_t)( blockPos - piecePos );
783}
784
785static int
786refillPulse( void * vtorrent )
787{
788    tr_block_index_t block;
789    int peerCount;
790    int webseedCount;
791    tr_peer ** peers;
792    tr_webseed ** webseeds;
793    struct tr_blockIterator * blockIterator;
794    Torrent * t = vtorrent;
795    tr_torrent * tor = t->tor;
796
797    if( !t->isRunning )
798        return TRUE;
799    if( tr_torrentIsSeed( t->tor ) )
800        return TRUE;
801
802    torrentLock( t );
803    tordbg( t, "Refilling Request Buffers..." );
804
805    blockIterator = blockIteratorNew( t );
806    peers = getPeersUploadingToClient( t, &peerCount );
807    webseedCount = tr_ptrArraySize( t->webseeds );
808    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
809                          webseedCount * sizeof( tr_webseed* ) );
810
811    while( ( webseedCount || peerCount )
812        && blockIteratorNext( blockIterator, &block ) )
813    {
814        int j;
815        int handled = FALSE;
816
817        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
818        const uint32_t offset = getBlockOffsetInPiece( tor, block );
819        const uint32_t length = tr_torBlockCountBytes( tor, block );
820
821        /* find a peer who can ask for this block */
822        for( j=0; !handled && j<peerCount; )
823        {
824            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
825            switch( val )
826            {
827                case TR_ADDREQ_FULL:
828                case TR_ADDREQ_CLIENT_CHOKED:
829                    peers[j] = peers[--peerCount];
830                    break;
831
832                case TR_ADDREQ_MISSING:
833                case TR_ADDREQ_DUPLICATE:
834                    ++j;
835                    break;
836
837                case TR_ADDREQ_OK:
838                    incrementPieceRequests( t, index );
839                    handled = TRUE;
840                    break;
841
842                default:
843                    assert( 0 && "unhandled value" );
844                    break;
845            }
846        }
847
848        /* maybe one of the webseeds can do it */
849        for( j=0; !handled && j<webseedCount; )
850        {
851            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j],
852                                                          index, offset, length );
853            switch( val )
854            {
855                case TR_ADDREQ_FULL:
856                    webseeds[j] = webseeds[--webseedCount];
857                    break;
858
859                case TR_ADDREQ_OK:
860                    incrementPieceRequests( t, index );
861                    handled = TRUE;
862                    break;
863
864                default:
865                    assert( 0 && "unhandled value" );
866                    break;
867            }
868        }
869    }
870
871    /* cleanup */
872    blockIteratorFree( blockIterator );
873    tr_free( webseeds );
874    tr_free( peers );
875
876    t->refillTimer = NULL;
877    torrentUnlock( t );
878    return FALSE;
879}
880
881static void
882broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
883{
884    int i, size;
885    tr_peer ** peers;
886
887    assert( torrentIsLocked( t ) );
888
889    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
890    peers = getConnectedPeers( t, &size );
891    for( i=0; i<size; ++i )
892        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
893    tr_free( peers );
894}
895
896static void
897addStrike( Torrent * t,
898           tr_peer * peer )
899{
900    tordbg( t, "increasing peer %s strike count to %d",
901            tr_peerIoAddrStr( &peer->addr,
902                              peer->port ), peer->strikes + 1 );
903
904    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
905    {
906        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
907        atom->myflags |= MYFLAG_BANNED;
908        peer->doPurge = 1;
909        tordbg( t, "banning peer %s",
910               tr_peerIoAddrStr( &atom->addr, atom->port ) );
911    }
912}
913
914static void
915gotBadPiece( Torrent *        t,
916             tr_piece_index_t pieceIndex )
917{
918    tr_torrent *   tor = t->tor;
919    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
920
921    tor->corruptCur += byteCount;
922    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
923}
924
925static void
926refillSoon( Torrent * t )
927{
928    if( t->refillTimer == NULL )
929        t->refillTimer = tr_timerNew( t->manager->session,
930                                      refillPulse, t,
931                                      REFILL_PERIOD_MSEC );
932}
933
934static void
935peerSuggestedPiece( Torrent            * t,
936                    tr_peer            * peer,
937                    tr_piece_index_t     pieceIndex,
938                    int                  isFastAllowed )
939{
940    assert( t );
941    assert( peer );
942    assert( peer->msgs );
943
944    /* is this a valid piece? */
945    if(  pieceIndex >= t->tor->info.pieceCount )
946        return;
947
948    /* don't ask for it if we've already got it */
949    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
950        return;
951
952    /* don't ask for it if they don't have it */
953    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
954        return;
955
956    /* don't ask for it if we're choked and it's not fast */
957    if( !isFastAllowed && peer->clientIsChoked )
958        return;
959
960    /* request the blocks that we don't have in this piece */
961    {
962        tr_block_index_t block;
963        const tr_torrent * tor = t->tor;
964        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
965        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
966
967        for( block=start; block<end; ++block )
968        {
969            if( !tr_cpBlockIsComplete( tor->completion, block ) )
970            {
971                const uint32_t offset = getBlockOffsetInPiece( tor, block );
972                const uint32_t length = tr_torBlockCountBytes( tor, block );
973                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
974                incrementPieceRequests( t, pieceIndex );
975            }
976        }
977    }
978}
979
980static void
981peerCallbackFunc( void * vpeer,
982                  void * vevent,
983                  void * vt )
984{
985    tr_peer             * peer = vpeer; /* may be NULL if peer is a webseed */
986    Torrent             * t = vt;
987    const tr_peer_event * e = vevent;
988
989    torrentLock( t );
990
991    switch( e->eventType )
992    {
993        case TR_PEER_UPLOAD_ONLY:
994            /* update our atom */
995            if( peer ) {
996                struct peer_atom * a = getExistingAtom( t, &peer->addr );
997                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
998            }
999            break;
1000
1001        case TR_PEER_NEED_REQ:
1002            refillSoon( t );
1003            break;
1004
1005        case TR_PEER_CANCEL:
1006            decrementPieceRequests( t, e->pieceIndex );
1007            break;
1008
1009        case TR_PEER_PEER_GOT_DATA:
1010        {
1011            const time_t now = time( NULL );
1012            tr_torrent * tor = t->tor;
1013
1014            tor->activityDate = now;
1015
1016            if( e->wasPieceData )
1017                tor->uploadedCur += e->length;
1018
1019            /* update the stats */
1020            if( e->wasPieceData )
1021                tr_statsAddUploaded( tor->session, e->length );
1022
1023            /* update our atom */
1024            if( peer ) {
1025                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1026                if( e->wasPieceData )
1027                    a->piece_data_time = now;
1028            }
1029
1030            break;
1031        }
1032
1033        case TR_PEER_CLIENT_GOT_SUGGEST:
1034            if( peer )
1035                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1036            break;
1037
1038        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1039            if( peer )
1040                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1041            break;
1042
1043        case TR_PEER_CLIENT_GOT_DATA:
1044        {
1045            const time_t now = time( NULL );
1046            tr_torrent * tor = t->tor;
1047            tor->activityDate = now;
1048
1049            /* only add this to downloadedCur if we got it from a peer --
1050             * webseeds shouldn't count against our ratio.  As one tracker
1051             * admin put it, "Those pieces are downloaded directly from the
1052             * content distributor, not the peers, it is the tracker's job
1053             * to manage the swarms, not the web server and does not fit
1054             * into the jurisdiction of the tracker." */
1055            if( peer && e->wasPieceData )
1056                tor->downloadedCur += e->length;
1057
1058            /* update the stats */ 
1059            if( e->wasPieceData )
1060                tr_statsAddDownloaded( tor->session, e->length );
1061
1062            /* update our atom */
1063            if( peer ) {
1064                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1065                if( e->wasPieceData )
1066                    a->piece_data_time = now;
1067            }
1068
1069            break;
1070        }
1071
1072        case TR_PEER_PEER_PROGRESS:
1073        {
1074            if( peer )
1075            {
1076                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1077                const int peerIsSeed = e->progress >= 1.0;
1078                if( peerIsSeed ) {
1079                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1080                    atom->flags |= ADDED_F_SEED_FLAG;
1081                } else {
1082                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1083                    atom->flags &= ~ADDED_F_SEED_FLAG;
1084                }
1085            }
1086            break;
1087        }
1088
1089        case TR_PEER_CLIENT_GOT_BLOCK:
1090        {
1091            tr_torrent *     tor = t->tor;
1092
1093            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1094
1095            tr_cpBlockAdd( tor->completion, block );
1096            decrementPieceRequests( t, e->pieceIndex );
1097
1098            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1099
1100            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
1101            {
1102                const tr_piece_index_t p = e->pieceIndex;
1103                const tr_bool ok = tr_ioTestPiece( tor, p );
1104
1105                if( !ok )
1106                {
1107                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1108                               (unsigned long)p );
1109                }
1110
1111                tr_torrentSetHasPiece( tor, p, ok );
1112                tr_torrentSetPieceChecked( tor, p, TRUE );
1113                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1114
1115                if( !ok )
1116                    gotBadPiece( t, p );
1117                else
1118                {
1119                    int i, peerCount;
1120                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1121                    for( i = 0; i < peerCount; ++i )
1122                        tr_peerMsgsHave( peers[i]->msgs, p );
1123                    tr_free( peers );
1124                }
1125
1126                tr_torrentRecheckCompleteness( tor );
1127            }
1128            break;
1129        }
1130
1131        case TR_PEER_ERROR:
1132            if( e->err == EINVAL )
1133            {
1134                addStrike( t, peer );
1135                peer->doPurge = 1;
1136                tordbg( t, "setting doPurge because we got an EINVAL error" );
1137            }
1138            else if( ( e->err == ERANGE )
1139                  || ( e->err == EMSGSIZE )
1140                  || ( e->err == ENOTCONN ) )
1141            {
1142                /* some protocol error from the peer */
1143                peer->doPurge = 1;
1144                tordbg( t, "setting doPurge because we got an ERANGE, EMSGSIZE, or ENOTCONN error" );
1145            }
1146            else /* a local error, such as an IO error */
1147            {
1148                t->tor->error = e->err;
1149                tr_strlcpy( t->tor->errorString,
1150                            tr_strerror( t->tor->error ),
1151                            sizeof( t->tor->errorString ) );
1152                tr_torrentStop( t->tor );
1153            }
1154            break;
1155
1156        default:
1157            assert( 0 );
1158    }
1159
1160    torrentUnlock( t );
1161}
1162
1163static void
1164ensureAtomExists( Torrent          * t,
1165                  const tr_address * addr,
1166                  tr_port            port,
1167                  uint8_t            flags,
1168                  uint8_t            from )
1169{
1170    if( getExistingAtom( t, addr ) == NULL )
1171    {
1172        struct peer_atom * a;
1173        a = tr_new0( struct peer_atom, 1 );
1174        a->addr = *addr;
1175        a->port = port;
1176        a->flags = flags;
1177        a->from = from;
1178        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1179        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1180    }
1181}
1182
1183static int
1184getMaxPeerCount( const tr_torrent * tor )
1185{
1186    return tor->maxConnectedPeers;
1187}
1188
1189static int
1190getPeerCount( const Torrent * t )
1191{
1192    return tr_ptrArraySize( t->peers ) + tr_ptrArraySize( t->outgoingHandshakes );
1193}
1194
1195/* FIXME: this is kind of a mess. */
1196static tr_bool
1197myHandshakeDoneCB( tr_handshake *  handshake,
1198                   tr_peerIo *     io,
1199                   int             isConnected,
1200                   const uint8_t * peer_id,
1201                   void *          vmanager )
1202{
1203    tr_bool            ok = isConnected;
1204    tr_bool            success = FALSE;
1205    tr_port            port;
1206    const tr_address * addr;
1207    tr_peerMgr       * manager = vmanager;
1208    Torrent          * t;
1209    tr_handshake     * ours;
1210
1211    assert( io );
1212    assert( isConnected == 0 || isConnected == 1 );
1213
1214    t = tr_peerIoHasTorrentHash( io )
1215        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1216        : NULL;
1217
1218    if( tr_peerIoIsIncoming ( io ) )
1219        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1220                                        handshake, handshakeCompare );
1221    else if( t )
1222        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1223                                        handshake, handshakeCompare );
1224    else
1225        ours = handshake;
1226
1227    assert( ours );
1228    assert( ours == handshake );
1229
1230    if( t )
1231        torrentLock( t );
1232
1233    addr = tr_peerIoGetAddress( io, &port );
1234
1235    if( !ok || !t || !t->isRunning )
1236    {
1237        if( t )
1238        {
1239            struct peer_atom * atom = getExistingAtom( t, addr );
1240            if( atom )
1241                ++atom->numFails;
1242        }
1243
1244        tr_peerIoFree( io );
1245    }
1246    else /* looking good */
1247    {
1248        struct peer_atom * atom;
1249        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1250        atom = getExistingAtom( t, addr );
1251        atom->time = time( NULL );
1252        atom->piece_data_time = 0;
1253
1254        if( atom->myflags & MYFLAG_BANNED )
1255        {
1256            tordbg( t, "banned peer %s tried to reconnect",
1257                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1258            tr_peerIoFree( io );
1259        }
1260        else if( tr_peerIoIsIncoming( io )
1261               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1262
1263        {
1264            tr_peerIoFree( io );
1265        }
1266        else
1267        {
1268            tr_peer * peer = getExistingPeer( t, addr );
1269
1270            if( peer ) /* we already have this peer */
1271            {
1272                tr_peerIoFree( io );
1273            }
1274            else
1275            {
1276                peer = getPeer( t, addr );
1277                tr_free( peer->client );
1278
1279                if( !peer_id )
1280                    peer->client = NULL;
1281                else {
1282                    char client[128];
1283                    tr_clientForId( client, sizeof( client ), peer_id );
1284                    peer->client = tr_strdup( client );
1285                }
1286
1287                peer->port = port;
1288                peer->io = io;
1289                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1290                tr_peerIoSetBandwidth( io, peer->bandwidth );
1291
1292                success = TRUE;
1293            }
1294        }
1295    }
1296
1297    if( t )
1298        torrentUnlock( t );
1299
1300    return success;
1301}
1302
1303void
1304tr_peerMgrAddIncoming( tr_peerMgr * manager,
1305                       tr_address * addr,
1306                       tr_port      port,
1307                       int          socket )
1308{
1309    managerLock( manager );
1310
1311    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1312    {
1313        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1314        tr_netClose( socket );
1315    }
1316    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1317    {
1318        tr_netClose( socket );
1319    }
1320    else /* we don't have a connetion to them yet... */
1321    {
1322        tr_peerIo *    io;
1323        tr_handshake * handshake;
1324
1325        io = tr_peerIoNewIncoming( manager->session, addr, port, socket );
1326
1327        handshake = tr_handshakeNew( io,
1328                                     manager->session->encryptionMode,
1329                                     myHandshakeDoneCB,
1330                                     manager );
1331
1332        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake,
1333                                 handshakeCompare );
1334    }
1335
1336    managerUnlock( manager );
1337}
1338
1339static int
1340tr_isPex( const tr_pex * pex )
1341{
1342    return pex && tr_isAddress( &pex->addr );
1343}
1344
1345void
1346tr_peerMgrAddPex( tr_peerMgr *    manager,
1347                  const uint8_t * torrentHash,
1348                  uint8_t         from,
1349                  const tr_pex *  pex )
1350{
1351    Torrent * t;
1352    managerLock( manager );
1353
1354    assert( tr_isPex( pex ) );
1355
1356    t = getExistingTorrent( manager, torrentHash );
1357    if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1358        ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1359   
1360    managerUnlock( manager );
1361}
1362
1363tr_pex *
1364tr_peerMgrCompactToPex( const void *    compact,
1365                        size_t          compactLen,
1366                        const uint8_t * added_f,
1367                        size_t          added_f_len,
1368                        size_t *        pexCount )
1369{
1370    size_t          i;
1371    size_t          n = compactLen / 6;
1372    const uint8_t * walk = compact;
1373    tr_pex *        pex = tr_new0( tr_pex, n );
1374
1375    for( i = 0; i < n; ++i )
1376    {
1377        pex[i].addr.type = TR_AF_INET;
1378        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1379        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1380        if( added_f && ( n == added_f_len ) )
1381            pex[i].flags = added_f[i];
1382    }
1383
1384    *pexCount = n;
1385    return pex;
1386}
1387
1388tr_pex *
1389tr_peerMgrCompact6ToPex( const void    * compact,
1390                         size_t          compactLen,
1391                         const uint8_t * added_f,
1392                         size_t          added_f_len,
1393                         size_t        * pexCount )
1394{
1395    size_t          i;
1396    size_t          n = compactLen / 18;
1397    const uint8_t * walk = compact;
1398    tr_pex *        pex = tr_new0( tr_pex, n );
1399   
1400    for( i = 0; i < n; ++i )
1401    {
1402        pex[i].addr.type = TR_AF_INET6;
1403        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1404        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1405        if( added_f && ( n == added_f_len ) )
1406            pex[i].flags = added_f[i];
1407    }
1408   
1409    *pexCount = n;
1410    return pex;
1411}
1412
1413tr_pex *
1414tr_peerMgrArrayToPex( const void * array,
1415                      size_t       arrayLen,
1416                      size_t      * pexCount )
1417{
1418    size_t          i;
1419    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1420    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1421    const uint8_t * walk = array;
1422    tr_pex        * pex = tr_new0( tr_pex, n );
1423   
1424    for( i = 0 ; i < n ; i++ ) {
1425        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1426        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1427        pex[i].flags = 0x00;
1428        walk += sizeof( tr_address ) + 2;
1429    }
1430   
1431    *pexCount = n;
1432    return pex;
1433}
1434
1435/**
1436***
1437**/
1438
1439void
1440tr_peerMgrSetBlame( tr_peerMgr *     manager,
1441                    const uint8_t *  torrentHash,
1442                    tr_piece_index_t pieceIndex,
1443                    int              success )
1444{
1445    if( !success )
1446    {
1447        int        peerCount, i;
1448        Torrent *  t = getExistingTorrent( manager, torrentHash );
1449        tr_peer ** peers;
1450
1451        assert( torrentIsLocked( t ) );
1452
1453        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1454        for( i = 0; i < peerCount; ++i )
1455        {
1456            tr_peer * peer = peers[i];
1457            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1458            {
1459                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1460                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1461                        pieceIndex, (int)peer->strikes + 1 );
1462                addStrike( t, peer );
1463            }
1464        }
1465    }
1466}
1467
1468int
1469tr_pexCompare( const void * va, const void * vb )
1470{
1471    const tr_pex * a = va;
1472    const tr_pex * b = vb;
1473    int i;
1474
1475    assert( tr_isPex( a ) );
1476    assert( tr_isPex( b ) );
1477
1478    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1479        return i;
1480
1481    if( a->port != b->port )
1482        return a->port < b->port ? -1 : 1;
1483
1484    return 0;
1485}
1486
1487static int
1488peerPrefersCrypto( const tr_peer * peer )
1489{
1490    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1491        return TRUE;
1492
1493    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1494        return FALSE;
1495
1496    return tr_peerIoIsEncrypted( peer->io );
1497}
1498
1499int
1500tr_peerMgrGetPeers( tr_peerMgr      * manager,
1501                    const uint8_t   * torrentHash,
1502                    tr_pex         ** setme_pex,
1503                    uint8_t           af)
1504{
1505    int peerCount = 0;
1506    int peersReturning = 0;
1507    const Torrent *  t;
1508
1509    managerLock( manager );
1510
1511    t = getExistingTorrent( manager, torrentHash );
1512    if( t == NULL )
1513    {
1514        *setme_pex = NULL;
1515    }
1516    else
1517    {
1518        int i;
1519        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1520        /* for now, this will waste memory on torrents that have both
1521         * ipv6 and ipv4 peers */
1522        tr_pex * pex = tr_new( tr_pex, peerCount );
1523        tr_pex * walk = pex;
1524
1525        for( i=0; i<peerCount; ++i )
1526        {
1527            const tr_peer * peer = peers[i];
1528            if( peer->addr.type == af )
1529            {
1530                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1531                assert( tr_isAddress( &peer->addr ) );
1532                walk->addr = peer->addr;
1533                walk->port = peer->port;
1534                walk->flags = 0;
1535                if( peerPrefersCrypto( peer ) )
1536                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1537                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) ||
1538                    ( peer->progress >= 1.0 ) )
1539                    walk->flags |= ADDED_F_SEED_FLAG;
1540                ++peersReturning;
1541                ++walk;
1542            }
1543        }
1544
1545#warning this for loop can be removed when we are sure the bug is fixed
1546        for( i=0; i<peersReturning; ++i )
1547            assert( tr_isAddress( &pex[i].addr ) );
1548
1549        assert( ( walk - pex ) == peersReturning );
1550        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1551
1552#warning this for loop can be removed when we are sure the bug is fixed
1553        for( i=0; i<peersReturning; ++i )
1554            assert( tr_isAddress( &pex[i].addr ) );
1555
1556        *setme_pex = pex;
1557    }
1558
1559    managerUnlock( manager );
1560    return peersReturning;
1561}
1562
1563static int reconnectPulse( void * vtorrent );
1564
1565static int rechokePulse( void * vtorrent );
1566
1567void
1568tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1569                        const uint8_t * torrentHash )
1570{
1571    Torrent * t;
1572
1573    managerLock( manager );
1574
1575    t = getExistingTorrent( manager, torrentHash );
1576
1577    assert( t );
1578    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1579    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1580
1581    if( !t->isRunning )
1582    {
1583        t->isRunning = 1;
1584
1585        t->reconnectTimer = tr_timerNew( t->manager->session,
1586                                         reconnectPulse, t,
1587                                         RECONNECT_PERIOD_MSEC );
1588
1589        t->rechokeTimer = tr_timerNew( t->manager->session,
1590                                       rechokePulse, t,
1591                                       RECHOKE_PERIOD_MSEC );
1592
1593        reconnectPulse( t );
1594
1595        rechokePulse( t );
1596
1597        if( !tr_ptrArrayEmpty( t->webseeds ) )
1598            refillSoon( t );
1599    }
1600
1601    managerUnlock( manager );
1602}
1603
1604static void
1605stopTorrent( Torrent * t )
1606{
1607    assert( torrentIsLocked( t ) );
1608
1609    t->isRunning = 0;
1610    tr_timerFree( &t->rechokeTimer );
1611    tr_timerFree( &t->reconnectTimer );
1612
1613    /* disconnect the peers. */
1614    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1615    tr_ptrArrayClear( t->peers );
1616
1617    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1618     * which removes the handshake from t->outgoingHandshakes... */
1619    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1620        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1621}
1622
1623void
1624tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1625                       const uint8_t * torrentHash )
1626{
1627    managerLock( manager );
1628
1629    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1630
1631    managerUnlock( manager );
1632}
1633
1634void
1635tr_peerMgrAddTorrent( tr_peerMgr * manager,
1636                      tr_torrent * tor )
1637{
1638    Torrent * t;
1639
1640    managerLock( manager );
1641
1642    assert( tor );
1643    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1644
1645    t = torrentConstructor( manager, tor );
1646    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1647
1648    managerUnlock( manager );
1649}
1650
1651void
1652tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1653                         const uint8_t * torrentHash )
1654{
1655    Torrent * t;
1656
1657    managerLock( manager );
1658
1659    t = getExistingTorrent( manager, torrentHash );
1660    assert( t );
1661    stopTorrent( t );
1662    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1663    torrentDestructor( t );
1664
1665    managerUnlock( manager );
1666}
1667
1668void
1669tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1670                               const uint8_t *    torrentHash,
1671                               int8_t *           tab,
1672                               unsigned int       tabCount )
1673{
1674    tr_piece_index_t   i;
1675    const Torrent *    t;
1676    const tr_torrent * tor;
1677    float              interval;
1678    int                isSeed;
1679    int                peerCount;
1680    const tr_peer **   peers;
1681
1682    managerLock( manager );
1683
1684    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1685    tor = t->tor;
1686    interval = tor->info.pieceCount / (float)tabCount;
1687    isSeed = tor && ( tr_cpGetStatus ( tor->completion ) == TR_SEED );
1688    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1689
1690    memset( tab, 0, tabCount );
1691
1692    for( i = 0; tor && i < tabCount; ++i )
1693    {
1694        const int piece = i * interval;
1695
1696        if( isSeed || tr_cpPieceIsComplete( tor->completion, piece ) )
1697            tab[i] = -1;
1698        else if( peerCount )
1699        {
1700            int j;
1701            for( j = 0; j < peerCount; ++j )
1702                if( tr_bitfieldHas( peers[j]->have, i ) )
1703                    ++tab[i];
1704        }
1705    }
1706
1707    managerUnlock( manager );
1708}
1709
1710/* Returns the pieces that are available from peers */
1711tr_bitfield*
1712tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1713                        const uint8_t *    torrentHash )
1714{
1715    int           i, size;
1716    Torrent *     t;
1717    tr_peer **    peers;
1718    tr_bitfield * pieces;
1719    managerLock( manager );
1720
1721    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1722    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1723    peers = getConnectedPeers( t, &size );
1724    for( i = 0; i < size; ++i )
1725        tr_bitfieldOr( pieces, peers[i]->have );
1726
1727    managerUnlock( manager );
1728    tr_free( peers );
1729    return pieces;
1730}
1731
1732int
1733tr_peerMgrHasConnections( const tr_peerMgr * manager,
1734                          const uint8_t *    torrentHash )
1735{
1736    int ret;
1737    const Torrent * t;
1738    managerLock( manager );
1739
1740    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1741    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1742
1743    managerUnlock( manager );
1744    return ret;
1745}
1746
1747void
1748tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1749                        const uint8_t    * torrentHash,
1750                        int              * setmePeersKnown,
1751                        int              * setmePeersConnected,
1752                        int              * setmeSeedsConnected,
1753                        int              * setmeWebseedsSendingToUs,
1754                        int              * setmePeersSendingToUs,
1755                        int              * setmePeersGettingFromUs,
1756                        int              * setmePeersFrom )
1757{
1758    int i, size;
1759    const Torrent * t;
1760    const tr_peer ** peers;
1761    const tr_webseed ** webseeds;
1762
1763    managerLock( manager );
1764
1765    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1766    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1767
1768    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1769    *setmePeersConnected       = 0;
1770    *setmeSeedsConnected       = 0;
1771    *setmePeersGettingFromUs   = 0;
1772    *setmePeersSendingToUs     = 0;
1773    *setmeWebseedsSendingToUs  = 0;
1774
1775    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1776        setmePeersFrom[i] = 0;
1777
1778    for( i=0; i<size; ++i )
1779    {
1780        const tr_peer * peer = peers[i];
1781        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1782
1783        if( peer->io == NULL ) /* not connected */
1784            continue;
1785
1786        ++*setmePeersConnected;
1787
1788        ++setmePeersFrom[atom->from];
1789
1790        if( clientIsDownloadingFrom( peer ) )
1791            ++*setmePeersSendingToUs;
1792
1793        if( clientIsUploadingTo( peer ) )
1794            ++*setmePeersGettingFromUs;
1795
1796        if( atom->flags & ADDED_F_SEED_FLAG )
1797            ++*setmeSeedsConnected;
1798    }
1799
1800    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1801    for( i=0; i<size; ++i )
1802    {
1803        if( tr_webseedIsActive( webseeds[i] ) )
1804            ++*setmeWebseedsSendingToUs;
1805    }
1806
1807    managerUnlock( manager );
1808}
1809
1810float*
1811tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1812                     const uint8_t *    torrentHash )
1813{
1814    const Torrent * t;
1815    const tr_webseed ** webseeds;
1816    int i;
1817    int webseedCount;
1818    float * ret;
1819
1820    assert( manager );
1821    managerLock( manager );
1822
1823    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1824    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds,
1825                                                     &webseedCount );
1826    assert( webseedCount == t->tor->info.webseedCount );
1827    ret = tr_new0( float, webseedCount );
1828
1829    for( i=0; i<webseedCount; ++i )
1830        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1831            ret[i] = -1.0;
1832
1833    managerUnlock( manager );
1834    return ret;
1835}
1836
1837double
1838tr_peerGetPieceSpeed( const tr_peer * peer, tr_direction direction )
1839{
1840    assert( peer );
1841    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
1842
1843    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
1844}
1845
1846
1847struct tr_peer_stat *
1848tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1849                     const   uint8_t     * torrentHash,
1850                     int                 * setmeCount UNUSED )
1851{
1852    int             i, size;
1853    const Torrent * t;
1854    tr_peer **      peers;
1855    tr_peer_stat *  ret;
1856
1857    assert( manager );
1858    managerLock( manager );
1859
1860    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1861    peers = getConnectedPeers( (Torrent*)t, &size );
1862    ret = tr_new0( tr_peer_stat, size );
1863
1864    for( i = 0; i < size; ++i )
1865    {
1866        char *                   pch;
1867        const tr_peer *          peer = peers[i];
1868        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1869        tr_peer_stat *           stat = ret + i;
1870        tr_address               norm_addr;
1871
1872        memcpy( &norm_addr, &peer->addr, sizeof( tr_address ) );
1873        tr_normalizeV4Mapped( &norm_addr );
1874        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1875        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1876                   sizeof( stat->client ) );
1877        stat->port               = ntohs( peer->port );
1878        stat->from               = atom->from;
1879        stat->progress           = peer->progress;
1880        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1881        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER );
1882        stat->rateToClient       = tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT );
1883        stat->peerIsChoked       = peer->peerIsChoked;
1884        stat->peerIsInterested   = peer->peerIsInterested;
1885        stat->clientIsChoked     = peer->clientIsChoked;
1886        stat->clientIsInterested = peer->clientIsInterested;
1887        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1888        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1889        stat->isUploadingTo      = clientIsUploadingTo( peer );
1890        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1891
1892        pch = stat->flagStr;
1893        if( t->optimistic == peer ) *pch++ = 'O';
1894        if( stat->isDownloadingFrom ) *pch++ = 'D';
1895        else if( stat->clientIsInterested ) *pch++ = 'd';
1896        if( stat->isUploadingTo ) *pch++ = 'U';
1897        else if( stat->peerIsInterested ) *pch++ = 'u';
1898        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1899        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1900        if( stat->isEncrypted ) *pch++ = 'E';
1901        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1902        if( stat->isIncoming ) *pch++ = 'I';
1903        *pch = '\0';
1904    }
1905
1906    *setmeCount = size;
1907    tr_free( peers );
1908
1909    managerUnlock( manager );
1910    return ret;
1911}
1912
1913/**
1914***
1915**/
1916
1917struct ChokeData
1918{
1919    tr_bool         doUnchoke;
1920    tr_bool         isInterested;
1921    tr_bool         isChoked;
1922    int             rate;
1923    tr_peer *       peer;
1924};
1925
1926static int
1927compareChoke( const void * va,
1928              const void * vb )
1929{
1930    const struct ChokeData * a = va;
1931    const struct ChokeData * b = vb;
1932
1933    if( a->rate != b->rate ) /* prefer higher overall speeds */
1934        return a->rate > b->rate ? -1 : 1;
1935
1936    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1937        return a->isChoked ? 1 : -1;
1938
1939    return 0;
1940}
1941
1942static int
1943isNew( const tr_peer * peer )
1944{
1945    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1946}
1947
1948static int
1949isSame( const tr_peer * peer )
1950{
1951    return peer && peer->client && strstr( peer->client, "Transmission" );
1952}
1953
1954/**
1955***
1956**/
1957
1958static void
1959rechoke( Torrent * t )
1960{
1961    int                i, peerCount, size, unchokedInterested;
1962    tr_peer **         peers = getConnectedPeers( t, &peerCount );
1963    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1964    const int          chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1965
1966    assert( torrentIsLocked( t ) );
1967
1968    /* sort the peers by preference and rate */
1969    for( i = 0, size = 0; i < peerCount; ++i )
1970    {
1971        tr_peer * peer = peers[i];
1972        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1973
1974        if( peer->progress >= 1.0 ) /* choke all seeds */
1975        {
1976            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1977        }
1978        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1979        {
1980            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1981        }
1982        else if( chokeAll ) /* choke everyone if we're not uploading */
1983        {
1984            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1985        }
1986        else
1987        {
1988            struct ChokeData * n = &choke[size++];
1989            n->peer         = peer;
1990            n->isInterested = peer->peerIsInterested;
1991            n->isChoked     = peer->peerIsChoked;
1992            n->rate         = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
1993        }
1994    }
1995
1996    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1997
1998    /**
1999     * Reciprocation and number of uploads capping is managed by unchoking
2000     * the N peers which have the best upload rate and are interested.
2001     * This maximizes the client's download rate. These N peers are
2002     * referred to as downloaders, because they are interested in downloading
2003     * from the client.
2004     *
2005     * Peers which have a better upload rate (as compared to the downloaders)
2006     * but aren't interested get unchoked. If they become interested, the
2007     * downloader with the worst upload rate gets choked. If a client has
2008     * a complete file, it uses its upload rate rather than its download
2009     * rate to decide which peers to unchoke.
2010     */
2011    unchokedInterested = 0;
2012    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
2013        choke[i].doUnchoke = 1;
2014        if( choke[i].isInterested )
2015            ++unchokedInterested;
2016    }
2017
2018    /* optimistic unchoke */
2019    if( i < size )
2020    {
2021        int n;
2022        struct ChokeData * c;
2023        tr_ptrArray * randPool = tr_ptrArrayNew( );
2024
2025        for( ; i<size; ++i )
2026        {
2027            if( choke[i].isInterested )
2028            {
2029                const tr_peer * peer = choke[i].peer;
2030                int x = 1, y;
2031                if( isNew( peer ) ) x *= 3;
2032                if( isSame( peer ) ) x *= 3;
2033                for( y=0; y<x; ++y )
2034                    tr_ptrArrayAppend( randPool, &choke[i] );
2035            }
2036        }
2037
2038        if(( n = tr_ptrArraySize( randPool )))
2039        {
2040            c = tr_ptrArrayNth( randPool, tr_cryptoWeakRandInt( n ));
2041            c->doUnchoke = 1;
2042            t->optimistic = c->peer;
2043        }
2044
2045        tr_ptrArrayFree( randPool, NULL );
2046    }
2047
2048    for( i = 0; i < size; ++i )
2049        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2050
2051    /* cleanup */
2052    tr_free( choke );
2053    tr_free( peers );
2054}
2055
2056static int
2057rechokePulse( void * vtorrent )
2058{
2059    Torrent * t = vtorrent;
2060
2061    torrentLock( t );
2062    rechoke( t );
2063    torrentUnlock( t );
2064    return TRUE;
2065}
2066
2067/***
2068****
2069****  Life and Death
2070****
2071***/
2072
2073static int
2074shouldPeerBeClosed( const Torrent * t,
2075                    const tr_peer * peer,
2076                    int             peerCount )
2077{
2078    const tr_torrent *       tor = t->tor;
2079    const time_t             now = time( NULL );
2080    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2081
2082    /* if it's marked for purging, close it */
2083    if( peer->doPurge )
2084    {
2085        tordbg( t, "purging peer %s because its doPurge flag is set",
2086                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2087        return TRUE;
2088    }
2089
2090    /* if we're seeding and the peer has everything we have,
2091     * and enough time has passed for a pex exchange, then disconnect */
2092    if( tr_torrentIsSeed( tor ) )
2093    {
2094        int peerHasEverything;
2095        if( atom->flags & ADDED_F_SEED_FLAG )
2096            peerHasEverything = TRUE;
2097        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
2098            peerHasEverything = FALSE;
2099        else {
2100            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
2101            tr_bitfieldDifference( tmp, peer->have );
2102            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2103            tr_bitfieldFree( tmp );
2104        }
2105        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 ))) {
2106            tordbg( t, "purging peer %s because we're both seeds",
2107                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2108            return TRUE;
2109        }
2110    }
2111
2112    /* disconnect if it's been too long since piece data has been transferred.
2113     * this is on a sliding scale based on number of available peers... */
2114    {
2115        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2116        /* if we have >= relaxIfFewerThan, strictness is 100%.
2117         * if we have zero connections, strictness is 0% */
2118        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2119            ? 1.0
2120            : peerCount / (float)relaxStrictnessIfFewerThanN;
2121        const int lo = MIN_UPLOAD_IDLE_SECS;
2122        const int hi = MAX_UPLOAD_IDLE_SECS;
2123        const int limit = hi - ( ( hi - lo ) * strictness );
2124        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2125/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2126        if( idleTime > limit ) {
2127            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2128                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2129            return TRUE;
2130        }
2131    }
2132
2133    return FALSE;
2134}
2135
2136static tr_peer **
2137getPeersToClose( Torrent * t, int * setmeSize )
2138{
2139    int i, peerCount, outsize;
2140    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
2141    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2142
2143    assert( torrentIsLocked( t ) );
2144
2145    for( i = outsize = 0; i < peerCount; ++i )
2146        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2147            ret[outsize++] = peers[i];
2148
2149    *setmeSize = outsize;
2150    return ret;
2151}
2152
2153static int
2154compareCandidates( const void * va,
2155                   const void * vb )
2156{
2157    const struct peer_atom * a = *(const struct peer_atom**) va;
2158    const struct peer_atom * b = *(const struct peer_atom**) vb;
2159
2160    /* <Charles> Here we would probably want to try reconnecting to
2161     * peers that had most recently given us data. Lots of users have
2162     * trouble with resets due to their routers and/or ISPs. This way we
2163     * can quickly recover from an unwanted reset. So we sort
2164     * piece_data_time in descending order.
2165     */
2166
2167    if( a->piece_data_time != b->piece_data_time )
2168        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2169
2170    if( a->numFails != b->numFails )
2171        return a->numFails < b->numFails ? -1 : 1;
2172
2173    if( a->time != b->time )
2174        return a->time < b->time ? -1 : 1;
2175
2176    /* all other things being equal, prefer peers whose
2177     * information comes from a more reliable source */
2178    if( a->from != b->from )
2179        return a->from < b->from ? -1 : 1;
2180
2181    return 0;
2182}
2183
2184static int
2185getReconnectIntervalSecs( const struct peer_atom * atom )
2186{
2187    int          sec;
2188    const time_t now = time( NULL );
2189
2190    /* if we were recently connected to this peer and transferring piece
2191     * data, try to reconnect to them sooner rather that later -- we don't
2192     * want network troubles to get in the way of a good peer. */
2193    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2194        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2195
2196    /* don't allow reconnects more often than our minimum */
2197    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2198        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2199
2200    /* otherwise, the interval depends on how many times we've tried
2201     * and failed to connect to the peer */
2202    else switch( atom->numFails ) {
2203        case 0: sec = 0; break;
2204        case 1: sec = 5; break;
2205        case 2: sec = 2 * 60; break;
2206        case 3: sec = 15 * 60; break;
2207        case 4: sec = 30 * 60; break;
2208        case 5: sec = 60 * 60; break;
2209        default: sec = 120 * 60; break;
2210    }
2211
2212    return sec;
2213}
2214
2215static struct peer_atom **
2216getPeerCandidates( Torrent * t, int * setmeSize )
2217{
2218    int                 i, atomCount, retCount;
2219    struct peer_atom ** atoms;
2220    struct peer_atom ** ret;
2221    const time_t        now = time( NULL );
2222    const int           seed = tr_torrentIsSeed( t->tor );
2223
2224    assert( torrentIsLocked( t ) );
2225
2226    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
2227    ret = tr_new( struct peer_atom*, atomCount );
2228    for( i = retCount = 0; i < atomCount; ++i )
2229    {
2230        int                interval;
2231        struct peer_atom * atom = atoms[i];
2232
2233        /* peer fed us too much bad data ... we only keep it around
2234         * now to weed it out in case someone sends it to us via pex */
2235        if( atom->myflags & MYFLAG_BANNED )
2236            continue;
2237
2238        /* peer was unconnectable before, so we're not going to keep trying.
2239         * this is needs a separate flag from `banned', since if they try
2240         * to connect to us later, we'll let them in */
2241        if( atom->myflags & MYFLAG_UNREACHABLE )
2242            continue;
2243
2244        /* we don't need two connections to the same peer... */
2245        if( peerIsInUse( t, &atom->addr ) )
2246            continue;
2247
2248        /* no need to connect if we're both seeds... */
2249        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2250                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2251            continue;
2252
2253        /* don't reconnect too often */
2254        interval = getReconnectIntervalSecs( atom );
2255        if( ( now - atom->time ) < interval )
2256        {
2257            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2258                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2259            continue;
2260        }
2261
2262        /* Don't connect to peers in our blocklist */
2263        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2264            continue;
2265
2266        ret[retCount++] = atom;
2267    }
2268
2269    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2270    *setmeSize = retCount;
2271    return ret;
2272}
2273
2274static int
2275reconnectPulse( void * vtorrent )
2276{
2277    Torrent *     t = vtorrent;
2278    static time_t prevTime = 0;
2279    static int    newConnectionsThisSecond = 0;
2280    time_t        now;
2281
2282    torrentLock( t );
2283
2284    now = time( NULL );
2285    if( prevTime != now )
2286    {
2287        prevTime = now;
2288        newConnectionsThisSecond = 0;
2289    }
2290
2291    if( !t->isRunning )
2292    {
2293        removeAllPeers( t );
2294    }
2295    else
2296    {
2297        int i, nCandidates, nBad;
2298        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2299        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2300
2301        if( nBad || nCandidates )
2302            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2303                   "%d connection candidates, %d atoms, max per pulse is %d",
2304                   t->tor->info.name, nBad, nCandidates,
2305                   tr_ptrArraySize( t->pool ),
2306                   (int)MAX_RECONNECTIONS_PER_PULSE );
2307
2308        /* disconnect some peers.
2309           if we transferred piece data, then they might be good peers,
2310           so reset their `numFails' weight to zero.  otherwise we connected
2311           to them fruitlessly, so mark it as another fail */
2312        for( i = 0; i < nBad; ++i ) {
2313            tr_peer * peer = connections[i];
2314            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2315            if( atom->piece_data_time )
2316                atom->numFails = 0;
2317            else
2318                ++atom->numFails;
2319             
2320            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2321            removePeer( t, peer );
2322        }
2323
2324        /* add some new ones */
2325        for( i = 0;    ( i < nCandidates )
2326           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2327           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2328           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2329        {
2330            tr_peerMgr *       mgr = t->manager;
2331            struct peer_atom * atom = candidates[i];
2332            tr_peerIo *        io;
2333
2334            tordbg( t, "Starting an OUTGOING connection with %s",
2335                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2336
2337            io = tr_peerIoNewOutgoing( mgr->session, &atom->addr, atom->port, t->hash );
2338            if( io == NULL )
2339            {
2340                atom->myflags |= MYFLAG_UNREACHABLE;
2341            }
2342            else
2343            {
2344                tr_handshake * handshake = tr_handshakeNew( io,
2345                                                            mgr->session->encryptionMode,
2346                                                            myHandshakeDoneCB,
2347                                                            mgr );
2348
2349                assert( tr_peerIoGetTorrentHash( io ) );
2350
2351                ++newConnectionsThisSecond;
2352
2353                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake,
2354                                         handshakeCompare );
2355            }
2356
2357            atom->time = time( NULL );
2358        }
2359
2360        /* cleanup */
2361        tr_free( connections );
2362        tr_free( candidates );
2363    }
2364
2365    torrentUnlock( t );
2366    return TRUE;
2367}
2368
2369/****
2370*****
2371*****  BANDWIDTH ALLOCATION
2372*****
2373****/
2374
2375static void
2376pumpAllPeers( tr_peerMgr * mgr )
2377{
2378    const int torrentCount = tr_ptrArraySize( mgr->torrents );
2379    int       i, j;
2380
2381    for( i=0; i<torrentCount; ++i )
2382    {
2383        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
2384        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
2385        {
2386            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
2387            tr_peerMsgsPulse( peer->msgs );
2388        }
2389    }
2390}
2391
2392static int
2393bandwidthPulse( void * vmgr )
2394{
2395    tr_peerMgr * mgr = vmgr;
2396    managerLock( mgr );
2397
2398    pumpAllPeers( mgr );
2399    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2400    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2401    pumpAllPeers( mgr );
2402
2403    managerUnlock( mgr );
2404    return TRUE;
2405}
Note: See TracBrowser for help on using the repository browser.