source: trunk/libtransmission/peer-mgr.c @ 7761

Last change on this file since 7761 was 7761, checked in by charles, 12 years ago

(trunk libT) when pruning out slow peers, don't prune if we don't have good candidates to replace them.

  • Property svn:keywords set to Date Rev Author Id
File size: 67.0 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7761 2009-01-20 03:32:54Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 333,
51
52    /* when many peers are available, keep idle ones this long */
53    MIN_UPLOAD_IDLE_SECS = ( 30 ),
54
55    /* when few peers are available, keep idle ones this long */
56    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
60   
61    /* how frequently to reallocate bandwidth */
62    BANDWIDTH_PERIOD_MSEC = 500,
63
64    /* max # of peers to ask fer per torrent per reconnect pulse */
65    MAX_RECONNECTIONS_PER_PULSE = 16,
66
67    /* max number of peers to ask for per second overall.
68    * this throttle is to avoid overloading the router */
69    MAX_CONNECTIONS_PER_SECOND = 32,
70
71    /* number of bad pieces a peer is allowed to send before we ban them */
72    MAX_BAD_PIECES_PER_PEER = 5,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* unreachable for now... but not banned.
78     * if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2,
80
81    /* the minimum we'll wait before attempting to reconnect to a peer */
82    MINIMUM_RECONNECT_INTERVAL_SECS = 5
83};
84
85
86/**
87***
88**/
89
90enum
91{
92    UPLOAD_ONLY_UKNOWN,
93    UPLOAD_ONLY_YES,
94    UPLOAD_ONLY_NO
95};
96
97/**
98 * Peer information that should be kept even before we've connected and
99 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
100 * which ones would make good candidates for connecting to, and to watch out
101 * for banned peers.
102 *
103 * @see tr_peer
104 * @see tr_peermsgs
105 */
106struct peer_atom
107{
108    uint8_t     from;
109    uint8_t     flags;       /* these match the added_f flags */
110    uint8_t     myflags;     /* flags that aren't defined in added_f */
111    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
112    tr_port     port;
113    uint16_t    numFails;
114    tr_address  addr;
115    time_t      time;        /* when the peer's connection status last changed */
116    time_t      piece_data_time;
117};
118
119typedef struct tr_torrent_peers
120{
121    tr_bool         isRunning;
122
123    uint8_t         hash[SHA_DIGEST_LENGTH];
124    int         *   pendingRequestCount;
125    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
126    tr_ptrArray     pool; /* struct peer_atom */
127    tr_ptrArray     peers; /* tr_peer */
128    tr_ptrArray     webseeds; /* tr_webseed */
129    tr_timer *      reconnectTimer;
130    tr_timer *      rechokeTimer;
131    tr_timer *      refillTimer;
132    tr_torrent *    tor;
133    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
134
135    struct tr_peerMgr * manager;
136}
137Torrent;
138
139struct tr_peerMgr
140{
141    tr_session      * session;
142    tr_ptrArray       incomingHandshakes; /* tr_handshake */
143    tr_timer        * bandwidthTimer;
144};
145
146#define tordbg( t, ... ) \
147    do { \
148        if( tr_deepLoggingIsActive( ) ) \
149            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
150    } while( 0 )
151
152#define dbgmsg( ... ) \
153    do { \
154        if( tr_deepLoggingIsActive( ) ) \
155            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
156    } while( 0 )
157
158/**
159***
160**/
161
162static TR_INLINE void
163managerLock( const struct tr_peerMgr * manager )
164{
165    tr_globalLock( manager->session );
166}
167
168static TR_INLINE void
169managerUnlock( const struct tr_peerMgr * manager )
170{
171    tr_globalUnlock( manager->session );
172}
173
174static TR_INLINE void
175torrentLock( Torrent * torrent )
176{
177    managerLock( torrent->manager );
178}
179
180static TR_INLINE void
181torrentUnlock( Torrent * torrent )
182{
183    managerUnlock( torrent->manager );
184}
185
186static TR_INLINE int
187torrentIsLocked( const Torrent * t )
188{
189    return tr_globalIsLocked( t->manager->session );
190}
191
192/**
193***
194**/
195
196static int
197handshakeCompareToAddr( const void * va, const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a, const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray      * handshakes,
212                      const tr_address * addr )
213{
214    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
215}
216
217static int
218comparePeerAtomToAddress( const void * va, const void * vb )
219{
220    const struct peer_atom * a = va;
221
222    return tr_compareAddresses( &a->addr, vb );
223}
224
225static int
226comparePeerAtoms( const void * va, const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static Torrent*
238getExistingTorrent( tr_peerMgr *    manager,
239                    const uint8_t * hash )
240{
241    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
242
243    return tor == NULL ? NULL : tor->torrentPeers;
244}
245
246static int
247peerCompare( const void * va, const void * vb )
248{
249    const tr_peer * a = va;
250    const tr_peer * b = vb;
251
252    return tr_compareAddresses( &a->addr, &b->addr );
253}
254
255static int
256peerCompareToAddr( const void * va, const void * vb )
257{
258    const tr_peer * a = va;
259
260    return tr_compareAddresses( &a->addr, vb );
261}
262
263static tr_peer*
264getExistingPeer( Torrent          * torrent,
265                 const tr_address * addr )
266{
267    assert( torrentIsLocked( torrent ) );
268    assert( addr );
269
270    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
271}
272
273static struct peer_atom*
274getExistingAtom( const Torrent    * t,
275                 const tr_address * addr )
276{
277    Torrent * tt = (Torrent*)t;
278    assert( torrentIsLocked( t ) );
279    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
280}
281
282static tr_bool
283peerIsInUse( const Torrent    * ct,
284             const tr_address * addr )
285{
286    Torrent * t = (Torrent*) ct;
287
288    assert( torrentIsLocked ( t ) );
289
290    return getExistingPeer( t, addr )
291        || getExistingHandshake( &t->outgoingHandshakes, addr )
292        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
293}
294
295static tr_peer*
296peerConstructor( const tr_address * addr )
297{
298    tr_peer * p;
299    p = tr_new0( tr_peer, 1 );
300    p->addr = *addr;
301    return p;
302}
303
304static tr_peer*
305getPeer( Torrent          * torrent,
306         const tr_address * addr )
307{
308    tr_peer * peer;
309
310    assert( torrentIsLocked( torrent ) );
311
312    peer = getExistingPeer( torrent, addr );
313
314    if( peer == NULL )
315    {
316        peer = peerConstructor( addr );
317        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
318    }
319
320    return peer;
321}
322
323static void
324peerDestructor( tr_peer * peer )
325{
326    assert( peer );
327
328    if( peer->msgs != NULL )
329    {
330        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
331        tr_peerMsgsFree( peer->msgs );
332    }
333
334    tr_peerIoClear( peer->io );
335    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
336
337    tr_bitfieldFree( peer->have );
338    tr_bitfieldFree( peer->blame );
339    tr_free( peer->client );
340
341    tr_free( peer );
342}
343
344static void
345removePeer( Torrent * t,
346            tr_peer * peer )
347{
348    tr_peer *          removed;
349    struct peer_atom * atom;
350
351    assert( torrentIsLocked( t ) );
352
353    atom = getExistingAtom( t, &peer->addr );
354    assert( atom );
355    atom->time = time( NULL );
356
357    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
358    assert( removed == peer );
359    peerDestructor( removed );
360}
361
362static void
363removeAllPeers( Torrent * t )
364{
365    while( !tr_ptrArrayEmpty( &t->peers ) )
366        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
367}
368
369static void
370torrentDestructor( void * vt )
371{
372    Torrent * t = vt;
373    uint8_t   hash[SHA_DIGEST_LENGTH];
374
375    assert( t );
376    assert( !t->isRunning );
377    assert( torrentIsLocked( t ) );
378    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
379    assert( tr_ptrArrayEmpty( &t->peers ) );
380
381    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
382
383    tr_timerFree( &t->reconnectTimer );
384    tr_timerFree( &t->rechokeTimer );
385    tr_timerFree( &t->refillTimer );
386
387    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
388    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
389    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
390    tr_ptrArrayDestruct( &t->peers, NULL );
391
392    tr_free( t->pendingRequestCount );
393    tr_free( t );
394}
395
396static void peerCallbackFunc( void * vpeer,
397                              void * vevent,
398                              void * vt );
399
400static Torrent*
401torrentConstructor( tr_peerMgr * manager,
402                    tr_torrent * tor )
403{
404    int       i;
405    Torrent * t;
406
407    t = tr_new0( Torrent, 1 );
408    t->manager = manager;
409    t->tor = tor;
410    t->pool = TR_PTR_ARRAY_INIT;
411    t->peers = TR_PTR_ARRAY_INIT;
412    t->webseeds = TR_PTR_ARRAY_INIT;
413    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
414    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
415
416    for( i = 0; i < tor->info.webseedCount; ++i )
417    {
418        tr_webseed * w =
419            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
420        tr_ptrArrayAppend( &t->webseeds, w );
421    }
422
423    return t;
424}
425
426
427static int bandwidthPulse( void * vmgr );
428
429
430tr_peerMgr*
431tr_peerMgrNew( tr_session * session )
432{
433    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
434
435    m->session = session;
436    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
437    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
438    return m;
439}
440
441void
442tr_peerMgrFree( tr_peerMgr * manager )
443{
444    managerLock( manager );
445
446    tr_timerFree( &manager->bandwidthTimer );
447
448    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
449     * the item from manager->handshakes, so this is a little roundabout... */
450    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
451        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
452
453    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
454
455    managerUnlock( manager );
456    tr_free( manager );
457}
458
459static int
460clientIsDownloadingFrom( const tr_peer * peer )
461{
462    return peer->clientIsInterested && !peer->clientIsChoked;
463}
464
465static int
466clientIsUploadingTo( const tr_peer * peer )
467{
468    return peer->peerIsInterested && !peer->peerIsChoked;
469}
470
471/***
472****
473***/
474
475tr_bool
476tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
477                      const tr_address  * addr )
478{
479    tr_bool isSeed = FALSE;
480    const Torrent * t = tor->torrentPeers;
481    const struct peer_atom * atom = getExistingAtom( t, addr );
482
483    if( atom )
484        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
485
486    return isSeed;
487}
488
489/****
490*****
491*****  REFILL
492*****
493****/
494
495static void
496assertValidPiece( Torrent * t, tr_piece_index_t piece )
497{
498    assert( t );
499    assert( t->tor );
500    assert( piece < t->tor->info.pieceCount );
501}
502
503static int
504getPieceRequests( Torrent * t, tr_piece_index_t piece )
505{
506    assertValidPiece( t, piece );
507
508    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
509}
510
511static void
512incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
513{
514    assertValidPiece( t, piece );
515
516    if( t->pendingRequestCount == NULL )
517        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
518    t->pendingRequestCount[piece]++;
519}
520
521static void
522decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
523{
524    assertValidPiece( t, piece );
525
526    if( t->pendingRequestCount )
527        t->pendingRequestCount[piece]--;
528}
529
530struct tr_refill_piece
531{
532    tr_priority_t    priority;
533    uint32_t         piece;
534    uint32_t         peerCount;
535    int              random;
536    int              pendingRequestCount;
537    int              missingBlockCount;
538};
539
540static int
541compareRefillPiece( const void * aIn, const void * bIn )
542{
543    const struct tr_refill_piece * a = aIn;
544    const struct tr_refill_piece * b = bIn;
545
546    /* if one piece has a higher priority, it goes first */
547    if( a->priority != b->priority )
548        return a->priority > b->priority ? -1 : 1;
549
550    /* have a per-priority endgame */
551    if( a->pendingRequestCount != b->pendingRequestCount )
552        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
553
554    /* fewer missing pieces goes first */
555    if( a->missingBlockCount != b->missingBlockCount )
556        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
557
558    /* otherwise if one has fewer peers, it goes first */
559    if( a->peerCount != b->peerCount )
560        return a->peerCount < b->peerCount ? -1 : 1;
561
562    /* otherwise go with our random seed */
563    if( a->random != b->random )
564        return a->random < b->random ? -1 : 1;
565
566    return 0;
567}
568
569static tr_piece_index_t *
570getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
571{
572    const tr_torrent  * tor = t->tor;
573    const tr_info     * inf = &tor->info;
574    tr_piece_index_t    i;
575    tr_piece_index_t    poolSize = 0;
576    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
577    int                 peerCount;
578    const tr_peer    ** peers;
579
580    assert( torrentIsLocked( t ) );
581
582    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
583    peerCount = tr_ptrArraySize( &t->peers );
584
585    /* make a list of the pieces that we want but don't have */
586    for( i = 0; i < inf->pieceCount; ++i )
587        if( !tor->info.pieces[i].dnd
588                && !tr_cpPieceIsComplete( &tor->completion, i ) )
589            pool[poolSize++] = i;
590
591    /* sort the pool by which to request next */
592    if( poolSize > 1 )
593    {
594        tr_piece_index_t j;
595        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
596
597        for( j = 0; j < poolSize; ++j )
598        {
599            int k;
600            const tr_piece_index_t piece = pool[j];
601            struct tr_refill_piece * setme = p + j;
602
603            setme->piece = piece;
604            setme->priority = inf->pieces[piece].priority;
605            setme->peerCount = 0;
606            setme->random = tr_cryptoWeakRandInt( INT_MAX );
607            setme->pendingRequestCount = getPieceRequests( t, piece );
608            setme->missingBlockCount
609                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
610
611            for( k = 0; k < peerCount; ++k )
612            {
613                const tr_peer * peer = peers[k];
614                if( peer->peerIsInterested
615                        && !peer->clientIsChoked
616                        && tr_bitfieldHas( peer->have, piece ) )
617                    ++setme->peerCount;
618            }
619        }
620
621        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
622               compareRefillPiece );
623
624        for( j = 0; j < poolSize; ++j )
625            pool[j] = p[j].piece;
626
627        tr_free( p );
628    }
629
630    *pieceCount = poolSize;
631    return pool;
632}
633
634struct tr_blockIterator
635{
636    Torrent * t;
637    tr_block_index_t blockIndex, blockCount, *blocks;
638    tr_piece_index_t pieceIndex, pieceCount, *pieces;
639};
640
641static struct tr_blockIterator*
642blockIteratorNew( Torrent * t )
643{
644    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
645    i->t = t;
646    i->pieces = getPreferredPieces( t, &i->pieceCount );
647    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
648    return i;
649}
650
651static int
652blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
653{
654    int found;
655    Torrent * t = i->t;
656    tr_torrent * tor = t->tor;
657
658    while( ( i->blockIndex == i->blockCount )
659        && ( i->pieceIndex < i->pieceCount ) )
660    {
661        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
662        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
663        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
664        tr_block_index_t block;
665
666        assert( index < tor->info.pieceCount );
667
668        i->blockCount = 0;
669        i->blockIndex = 0;
670        for( block=b; block!=e; ++block )
671            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
672                i->blocks[i->blockCount++] = block;
673    }
674
675    assert( i->blockCount <= tor->blockCountInPiece );
676
677    if(( found = ( i->blockIndex < i->blockCount )))
678        *setme = i->blocks[i->blockIndex++];
679
680    return found;
681}
682
683static void
684blockIteratorFree( struct tr_blockIterator * i )
685{
686    tr_free( i->blocks );
687    tr_free( i->pieces );
688    tr_free( i );
689}
690
691static tr_peer**
692getPeersUploadingToClient( Torrent * t,
693                           int *     setmeCount )
694{
695    int j;
696    int peerCount = 0;
697    int retCount = 0;
698    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
699    tr_peer ** ret = tr_new( tr_peer *, peerCount );
700
701    j = 0; /* this is a temporary test to make sure we walk through all the peers */
702    if( peerCount )
703    {
704        /* Get a list of peers we're downloading from.
705           Pick a different starting point each time so all peers
706           get a chance at being the first in line */
707        const int fencepost = tr_cryptoWeakRandInt( peerCount );
708        int i = fencepost;
709        do {
710            if( clientIsDownloadingFrom( peers[i] ) )
711                ret[retCount++] = peers[i];
712            i = ( i + 1 ) % peerCount;
713            ++j;
714        } while( i != fencepost );
715    }
716    assert( j == peerCount );
717    *setmeCount = retCount;
718    return ret;
719}
720
721static uint32_t
722getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
723{
724    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
725    const uint64_t blockPos = tor->blockSize * b;
726    assert( blockPos >= piecePos );
727    return (uint32_t)( blockPos - piecePos );
728}
729
730static int
731refillPulse( void * vtorrent )
732{
733    tr_block_index_t block;
734    int peerCount;
735    int webseedCount;
736    tr_peer ** peers;
737    tr_webseed ** webseeds;
738    struct tr_blockIterator * blockIterator;
739    Torrent * t = vtorrent;
740    tr_torrent * tor = t->tor;
741
742    if( !t->isRunning )
743        return TRUE;
744    if( tr_torrentIsSeed( t->tor ) )
745        return TRUE;
746
747    torrentLock( t );
748    tordbg( t, "Refilling Request Buffers..." );
749
750    blockIterator = blockIteratorNew( t );
751    peers = getPeersUploadingToClient( t, &peerCount );
752    webseedCount = tr_ptrArraySize( &t->webseeds );
753    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
754                          webseedCount * sizeof( tr_webseed* ) );
755
756    while( ( webseedCount || peerCount )
757        && blockIteratorNext( blockIterator, &block ) )
758    {
759        int j;
760        int handled = FALSE;
761
762        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
763        const uint32_t offset = getBlockOffsetInPiece( tor, block );
764        const uint32_t length = tr_torBlockCountBytes( tor, block );
765
766        /* find a peer who can ask for this block */
767        for( j=0; !handled && j<peerCount; )
768        {
769            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
770            switch( val )
771            {
772                case TR_ADDREQ_FULL:
773                case TR_ADDREQ_CLIENT_CHOKED:
774                    peers[j] = peers[--peerCount];
775                    break;
776
777                case TR_ADDREQ_MISSING:
778                case TR_ADDREQ_DUPLICATE:
779                    ++j;
780                    break;
781
782                case TR_ADDREQ_OK:
783                    incrementPieceRequests( t, index );
784                    handled = TRUE;
785                    break;
786
787                default:
788                    assert( 0 && "unhandled value" );
789                    break;
790            }
791        }
792
793        /* maybe one of the webseeds can do it */
794        for( j=0; !handled && j<webseedCount; )
795        {
796            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
797            switch( val )
798            {
799                case TR_ADDREQ_FULL:
800                    webseeds[j] = webseeds[--webseedCount];
801                    break;
802
803                case TR_ADDREQ_OK:
804                    incrementPieceRequests( t, index );
805                    handled = TRUE;
806                    break;
807
808                default:
809                    assert( 0 && "unhandled value" );
810                    break;
811            }
812        }
813    }
814
815    /* cleanup */
816    blockIteratorFree( blockIterator );
817    tr_free( webseeds );
818    tr_free( peers );
819
820    t->refillTimer = NULL;
821    torrentUnlock( t );
822    return FALSE;
823}
824
825static void
826broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
827{
828    size_t i;
829    size_t peerCount;
830    tr_peer ** peers;
831
832    assert( torrentIsLocked( t ) );
833
834    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
835
836    peerCount = tr_ptrArraySize( &t->peers );
837    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
838    for( i=0; i<peerCount; ++i )
839        if( peers[i]->msgs )
840            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
841}
842
843static void
844addStrike( Torrent * t,
845           tr_peer * peer )
846{
847    tordbg( t, "increasing peer %s strike count to %d",
848            tr_peerIoAddrStr( &peer->addr,
849                              peer->port ), peer->strikes + 1 );
850
851    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
852    {
853        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
854        atom->myflags |= MYFLAG_BANNED;
855        peer->doPurge = 1;
856        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
857    }
858}
859
860static void
861gotBadPiece( Torrent *        t,
862             tr_piece_index_t pieceIndex )
863{
864    tr_torrent *   tor = t->tor;
865    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
866
867    tor->corruptCur += byteCount;
868    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
869}
870
871static void
872refillSoon( Torrent * t )
873{
874    if( t->refillTimer == NULL )
875        t->refillTimer = tr_timerNew( t->manager->session,
876                                      refillPulse, t,
877                                      REFILL_PERIOD_MSEC );
878}
879
880static void
881peerSuggestedPiece( Torrent            * t UNUSED,
882                    tr_peer            * peer UNUSED,
883                    tr_piece_index_t     pieceIndex UNUSED,
884                    int                  isFastAllowed UNUSED )
885{
886#if 0
887    assert( t );
888    assert( peer );
889    assert( peer->msgs );
890
891    /* is this a valid piece? */
892    if(  pieceIndex >= t->tor->info.pieceCount )
893        return;
894
895    /* don't ask for it if we've already got it */
896    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
897        return;
898
899    /* don't ask for it if they don't have it */
900    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
901        return;
902
903    /* don't ask for it if we're choked and it's not fast */
904    if( !isFastAllowed && peer->clientIsChoked )
905        return;
906
907    /* request the blocks that we don't have in this piece */
908    {
909        tr_block_index_t block;
910        const tr_torrent * tor = t->tor;
911        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
912        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
913
914        for( block=start; block<end; ++block )
915        {
916            if( !tr_cpBlockIsComplete( tor->completion, block ) )
917            {
918                const uint32_t offset = getBlockOffsetInPiece( tor, block );
919                const uint32_t length = tr_torBlockCountBytes( tor, block );
920                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
921                incrementPieceRequests( t, pieceIndex );
922            }
923        }
924    }
925#endif
926}
927
928static void
929peerCallbackFunc( void * vpeer, void * vevent, void * vt )
930{
931    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
932    Torrent * t = vt;
933    const tr_peer_event * e = vevent;
934
935    torrentLock( t );
936
937    switch( e->eventType )
938    {
939        case TR_PEER_UPLOAD_ONLY:
940            /* update our atom */
941            if( peer ) {
942                struct peer_atom * a = getExistingAtom( t, &peer->addr );
943                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
944            }
945            break;
946
947        case TR_PEER_NEED_REQ:
948            refillSoon( t );
949            break;
950
951        case TR_PEER_CANCEL:
952            decrementPieceRequests( t, e->pieceIndex );
953            break;
954
955        case TR_PEER_PEER_GOT_DATA:
956        {
957            const time_t now = time( NULL );
958            tr_torrent * tor = t->tor;
959
960            tor->activityDate = now;
961
962            if( e->wasPieceData )
963                tor->uploadedCur += e->length;
964
965            /* update the stats */
966            if( e->wasPieceData )
967                tr_statsAddUploaded( tor->session, e->length );
968
969            /* update our atom */
970            if( peer ) {
971                struct peer_atom * a = getExistingAtom( t, &peer->addr );
972                if( e->wasPieceData )
973                    a->piece_data_time = now;
974            }
975
976            break;
977        }
978
979        case TR_PEER_CLIENT_GOT_SUGGEST:
980            if( peer )
981                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
982            break;
983
984        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
985            if( peer )
986                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
987            break;
988
989        case TR_PEER_CLIENT_GOT_DATA:
990        {
991            const time_t now = time( NULL );
992            tr_torrent * tor = t->tor;
993            tor->activityDate = now;
994
995            /* only add this to downloadedCur if we got it from a peer --
996             * webseeds shouldn't count against our ratio.  As one tracker
997             * admin put it, "Those pieces are downloaded directly from the
998             * content distributor, not the peers, it is the tracker's job
999             * to manage the swarms, not the web server and does not fit
1000             * into the jurisdiction of the tracker." */
1001            if( peer && e->wasPieceData )
1002                tor->downloadedCur += e->length;
1003
1004            /* update the stats */ 
1005            if( e->wasPieceData )
1006                tr_statsAddDownloaded( tor->session, e->length );
1007
1008            /* update our atom */
1009            if( peer ) {
1010                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1011                if( e->wasPieceData )
1012                    a->piece_data_time = now;
1013            }
1014
1015            break;
1016        }
1017
1018        case TR_PEER_PEER_PROGRESS:
1019        {
1020            if( peer )
1021            {
1022                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1023                const int peerIsSeed = e->progress >= 1.0;
1024                if( peerIsSeed ) {
1025                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1026                    atom->flags |= ADDED_F_SEED_FLAG;
1027                } else {
1028                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1029                    atom->flags &= ~ADDED_F_SEED_FLAG;
1030                }
1031            }
1032            break;
1033        }
1034
1035        case TR_PEER_CLIENT_GOT_BLOCK:
1036        {
1037            tr_torrent * tor = t->tor;
1038
1039            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1040
1041            tr_cpBlockAdd( &tor->completion, block );
1042            decrementPieceRequests( t, e->pieceIndex );
1043
1044            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1045
1046            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1047            {
1048                const tr_piece_index_t p = e->pieceIndex;
1049                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1050
1051                if( !ok )
1052                {
1053                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1054                               (unsigned long)p );
1055                }
1056
1057                tr_torrentSetHasPiece( tor, p, ok );
1058                tr_torrentSetPieceChecked( tor, p, TRUE );
1059                tr_peerMgrSetBlame( tor, p, ok );
1060
1061                if( !ok )
1062                {
1063                    gotBadPiece( t, p );
1064                }
1065                else
1066                {
1067                    int i;
1068                    int peerCount;
1069                    tr_peer ** peers;
1070                    tr_file_index_t fileIndex;
1071
1072                    peerCount = tr_ptrArraySize( &t->peers );
1073                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1074                    for( i=0; i<peerCount; ++i )
1075                        tr_peerMsgsHave( peers[i]->msgs, p );
1076
1077                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1078                    {
1079                        const tr_file * file = &tor->info.files[fileIndex];
1080                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1081                        {
1082                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1083                            tordbg( t, "closing recently-completed file \"%s\"", path );
1084                            tr_fdFileClose( path );
1085                            tr_free( path );
1086                        }
1087                    }
1088                }
1089
1090                tr_torrentRecheckCompleteness( tor );
1091            }
1092            break;
1093        }
1094
1095        case TR_PEER_ERROR:
1096            if( e->err == EINVAL )
1097            {
1098                addStrike( t, peer );
1099                peer->doPurge = 1;
1100                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1101            }
1102            else if( ( e->err == ERANGE )
1103                  || ( e->err == EMSGSIZE )
1104                  || ( e->err == ENOTCONN ) )
1105            {
1106                /* some protocol error from the peer */
1107                peer->doPurge = 1;
1108                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1109            }
1110            else /* a local error, such as an IO error */
1111            {
1112                t->tor->error = e->err;
1113                tr_strlcpy( t->tor->errorString,
1114                            tr_strerror( t->tor->error ),
1115                            sizeof( t->tor->errorString ) );
1116                tr_torrentStop( t->tor );
1117            }
1118            break;
1119
1120        default:
1121            assert( 0 );
1122    }
1123
1124    torrentUnlock( t );
1125}
1126
1127static void
1128ensureAtomExists( Torrent          * t,
1129                  const tr_address * addr,
1130                  tr_port            port,
1131                  uint8_t            flags,
1132                  uint8_t            from )
1133{
1134    if( getExistingAtom( t, addr ) == NULL )
1135    {
1136        struct peer_atom * a;
1137        a = tr_new0( struct peer_atom, 1 );
1138        a->addr = *addr;
1139        a->port = port;
1140        a->flags = flags;
1141        a->from = from;
1142        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1143        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1144    }
1145}
1146
1147static int
1148getMaxPeerCount( const tr_torrent * tor )
1149{
1150    return tor->maxConnectedPeers;
1151}
1152
1153static int
1154getPeerCount( const Torrent * t )
1155{
1156    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1157}
1158
1159/* FIXME: this is kind of a mess. */
1160static tr_bool
1161myHandshakeDoneCB( tr_handshake  * handshake,
1162                   tr_peerIo     * io,
1163                   int             isConnected,
1164                   const uint8_t * peer_id,
1165                   void          * vmanager )
1166{
1167    tr_bool            ok = isConnected;
1168    tr_bool            success = FALSE;
1169    tr_port            port;
1170    const tr_address * addr;
1171    tr_peerMgr       * manager = vmanager;
1172    Torrent          * t;
1173    tr_handshake     * ours;
1174
1175    assert( io );
1176    assert( isConnected == 0 || isConnected == 1 );
1177
1178    t = tr_peerIoHasTorrentHash( io )
1179        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1180        : NULL;
1181
1182    if( tr_peerIoIsIncoming ( io ) )
1183        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1184                                        handshake, handshakeCompare );
1185    else if( t )
1186        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1187                                        handshake, handshakeCompare );
1188    else
1189        ours = handshake;
1190
1191    assert( ours );
1192    assert( ours == handshake );
1193
1194    if( t )
1195        torrentLock( t );
1196
1197    addr = tr_peerIoGetAddress( io, &port );
1198
1199    if( !ok || !t || !t->isRunning )
1200    {
1201        if( t )
1202        {
1203            struct peer_atom * atom = getExistingAtom( t, addr );
1204            if( atom )
1205                ++atom->numFails;
1206        }
1207    }
1208    else /* looking good */
1209    {
1210        struct peer_atom * atom;
1211        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1212        atom = getExistingAtom( t, addr );
1213        atom->time = time( NULL );
1214        atom->piece_data_time = 0;
1215
1216        if( atom->myflags & MYFLAG_BANNED )
1217        {
1218            tordbg( t, "banned peer %s tried to reconnect",
1219                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1220        }
1221        else if( tr_peerIoIsIncoming( io )
1222               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1223
1224        {
1225        }
1226        else
1227        {
1228            tr_peer * peer = getExistingPeer( t, addr );
1229
1230            if( peer ) /* we already have this peer */
1231            {
1232            }
1233            else
1234            {
1235                peer = getPeer( t, addr );
1236                tr_free( peer->client );
1237
1238                if( !peer_id )
1239                    peer->client = NULL;
1240                else {
1241                    char client[128];
1242                    tr_clientForId( client, sizeof( client ), peer_id );
1243                    peer->client = tr_strdup( client );
1244                }
1245
1246                peer->port = port;
1247                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1248                                                                balanced by our unref in peerDestructor()  */
1249                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1250                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1251
1252                success = TRUE;
1253            }
1254        }
1255    }
1256
1257    if( t )
1258        torrentUnlock( t );
1259
1260    return success;
1261}
1262
1263void
1264tr_peerMgrAddIncoming( tr_peerMgr * manager,
1265                       tr_address * addr,
1266                       tr_port      port,
1267                       int          socket )
1268{
1269    managerLock( manager );
1270
1271    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1272    {
1273        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1274        tr_netClose( socket );
1275    }
1276    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1277    {
1278        tr_netClose( socket );
1279    }
1280    else /* we don't have a connetion to them yet... */
1281    {
1282        tr_peerIo *    io;
1283        tr_handshake * handshake;
1284
1285        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1286
1287        handshake = tr_handshakeNew( io,
1288                                     manager->session->encryptionMode,
1289                                     myHandshakeDoneCB,
1290                                     manager );
1291
1292        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1293
1294        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1295                                 handshakeCompare );
1296    }
1297
1298    managerUnlock( manager );
1299}
1300
1301static tr_bool
1302tr_isPex( const tr_pex * pex )
1303{
1304    return pex && tr_isAddress( &pex->addr );
1305}
1306
1307void
1308tr_peerMgrAddPex( tr_torrent   *  tor,
1309                  uint8_t         from,
1310                  const tr_pex *  pex )
1311{
1312    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1313    {
1314        Torrent * t = tor->torrentPeers;
1315        managerLock( t->manager );
1316
1317        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1318            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1319                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1320
1321        managerUnlock( t->manager );
1322    }
1323}
1324
1325tr_pex *
1326tr_peerMgrCompactToPex( const void *    compact,
1327                        size_t          compactLen,
1328                        const uint8_t * added_f,
1329                        size_t          added_f_len,
1330                        size_t *        pexCount )
1331{
1332    size_t          i;
1333    size_t          n = compactLen / 6;
1334    const uint8_t * walk = compact;
1335    tr_pex *        pex = tr_new0( tr_pex, n );
1336
1337    for( i = 0; i < n; ++i )
1338    {
1339        pex[i].addr.type = TR_AF_INET;
1340        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1341        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1342        if( added_f && ( n == added_f_len ) )
1343            pex[i].flags = added_f[i];
1344    }
1345
1346    *pexCount = n;
1347    return pex;
1348}
1349
1350tr_pex *
1351tr_peerMgrCompact6ToPex( const void    * compact,
1352                         size_t          compactLen,
1353                         const uint8_t * added_f,
1354                         size_t          added_f_len,
1355                         size_t        * pexCount )
1356{
1357    size_t          i;
1358    size_t          n = compactLen / 18;
1359    const uint8_t * walk = compact;
1360    tr_pex *        pex = tr_new0( tr_pex, n );
1361   
1362    for( i = 0; i < n; ++i )
1363    {
1364        pex[i].addr.type = TR_AF_INET6;
1365        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1366        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1367        if( added_f && ( n == added_f_len ) )
1368            pex[i].flags = added_f[i];
1369    }
1370   
1371    *pexCount = n;
1372    return pex;
1373}
1374
1375tr_pex *
1376tr_peerMgrArrayToPex( const void * array,
1377                      size_t       arrayLen,
1378                      size_t      * pexCount )
1379{
1380    size_t          i;
1381    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1382    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1383    const uint8_t * walk = array;
1384    tr_pex        * pex = tr_new0( tr_pex, n );
1385   
1386    for( i = 0 ; i < n ; i++ ) {
1387        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1388        tr_suspectAddress( &pex[i].addr, "tracker"  );
1389        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1390        pex[i].flags = 0x00;
1391        walk += sizeof( tr_address ) + 2;
1392    }
1393   
1394    *pexCount = n;
1395    return pex;
1396}
1397
1398/**
1399***
1400**/
1401
1402void
1403tr_peerMgrSetBlame( tr_torrent     * tor,
1404                    tr_piece_index_t pieceIndex,
1405                    int              success )
1406{
1407    if( !success )
1408    {
1409        int        peerCount, i;
1410        Torrent *  t = tor->torrentPeers;
1411        tr_peer ** peers;
1412
1413        assert( torrentIsLocked( t ) );
1414
1415        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1416        for( i = 0; i < peerCount; ++i )
1417        {
1418            tr_peer * peer = peers[i];
1419            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1420            {
1421                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1422                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1423                        pieceIndex, (int)peer->strikes + 1 );
1424                addStrike( t, peer );
1425            }
1426        }
1427    }
1428}
1429
1430int
1431tr_pexCompare( const void * va, const void * vb )
1432{
1433    const tr_pex * a = va;
1434    const tr_pex * b = vb;
1435    int i;
1436
1437    assert( tr_isPex( a ) );
1438    assert( tr_isPex( b ) );
1439
1440    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1441        return i;
1442
1443    if( a->port != b->port )
1444        return a->port < b->port ? -1 : 1;
1445
1446    return 0;
1447}
1448
1449static int
1450peerPrefersCrypto( const tr_peer * peer )
1451{
1452    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1453        return TRUE;
1454
1455    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1456        return FALSE;
1457
1458    return tr_peerIoIsEncrypted( peer->io );
1459}
1460
1461int
1462tr_peerMgrGetPeers( tr_torrent      * tor,
1463                    tr_pex         ** setme_pex,
1464                    uint8_t           af)
1465{
1466    int peersReturning = 0;
1467    const Torrent * t = tor->torrentPeers;
1468
1469    managerLock( t->manager );
1470
1471    {
1472        int i;
1473        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1474        const int peerCount = tr_ptrArraySize( &t->peers );
1475        /* for now, this will waste memory on torrents that have both
1476         * ipv6 and ipv4 peers */
1477        tr_pex * pex = tr_new( tr_pex, peerCount );
1478        tr_pex * walk = pex;
1479
1480        for( i=0; i<peerCount; ++i )
1481        {
1482            const tr_peer * peer = peers[i];
1483            if( peer->addr.type == af )
1484            {
1485                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1486
1487                assert( tr_isAddress( &peer->addr ) );
1488                walk->addr = peer->addr;
1489                walk->port = peer->port;
1490                walk->flags = 0;
1491                if( peerPrefersCrypto( peer ) )
1492                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1493                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1494                    walk->flags |= ADDED_F_SEED_FLAG;
1495                ++peersReturning;
1496                ++walk;
1497            }
1498        }
1499
1500        assert( ( walk - pex ) == peersReturning );
1501        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1502        *setme_pex = pex;
1503    }
1504
1505    managerUnlock( t->manager );
1506    return peersReturning;
1507}
1508
1509static int reconnectPulse( void * vtorrent );
1510
1511static int rechokePulse( void * vtorrent );
1512
1513void
1514tr_peerMgrStartTorrent( tr_torrent * tor )
1515{
1516    Torrent * t = tor->torrentPeers;
1517
1518    managerLock( t->manager );
1519
1520    assert( t );
1521    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1522    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1523
1524    if( !t->isRunning )
1525    {
1526        t->isRunning = 1;
1527
1528        t->reconnectTimer = tr_timerNew( t->manager->session,
1529                                         reconnectPulse, t,
1530                                         RECONNECT_PERIOD_MSEC );
1531
1532        t->rechokeTimer = tr_timerNew( t->manager->session,
1533                                       rechokePulse, t,
1534                                       RECHOKE_PERIOD_MSEC );
1535
1536        reconnectPulse( t );
1537
1538        rechokePulse( t );
1539
1540        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1541            refillSoon( t );
1542    }
1543
1544    managerUnlock( t->manager );
1545}
1546
1547static void
1548stopTorrent( Torrent * t )
1549{
1550    assert( torrentIsLocked( t ) );
1551
1552    t->isRunning = 0;
1553    tr_timerFree( &t->rechokeTimer );
1554    tr_timerFree( &t->reconnectTimer );
1555
1556    /* disconnect the peers. */
1557    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1558    tr_ptrArrayClear( &t->peers );
1559
1560    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1561     * which removes the handshake from t->outgoingHandshakes... */
1562    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1563        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1564}
1565
1566void
1567tr_peerMgrStopTorrent( tr_torrent * tor )
1568{
1569    Torrent * t = tor->torrentPeers;
1570
1571    managerLock( t->manager );
1572
1573    stopTorrent( t );
1574
1575    managerUnlock( t->manager );
1576}
1577
1578void
1579tr_peerMgrAddTorrent( tr_peerMgr * manager,
1580                      tr_torrent * tor )
1581{
1582    managerLock( manager );
1583
1584    assert( tor );
1585    assert( tor->torrentPeers == NULL );
1586
1587    tor->torrentPeers = torrentConstructor( manager, tor );
1588
1589    managerUnlock( manager );
1590}
1591
1592void
1593tr_peerMgrRemoveTorrent( tr_torrent * tor )
1594{
1595    tr_torrentLock( tor );
1596
1597    stopTorrent( tor->torrentPeers );
1598    torrentDestructor( tor->torrentPeers );
1599
1600    tr_torrentUnlock( tor );
1601}
1602
1603void
1604tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1605                               int8_t           * tab,
1606                               unsigned int       tabCount )
1607{
1608    tr_piece_index_t   i;
1609    const Torrent *    t;
1610    float              interval;
1611    tr_bool            isSeed;
1612    int                peerCount;
1613    const tr_peer **   peers;
1614    tr_torrentLock( tor );
1615
1616    t = tor->torrentPeers;
1617    tor = t->tor;
1618    interval = tor->info.pieceCount / (float)tabCount;
1619    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1620    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1621    peerCount = tr_ptrArraySize( &t->peers );
1622
1623    memset( tab, 0, tabCount );
1624
1625    for( i = 0; tor && i < tabCount; ++i )
1626    {
1627        const int piece = i * interval;
1628
1629        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1630            tab[i] = -1;
1631        else if( peerCount ) {
1632            int j;
1633            for( j = 0; j < peerCount; ++j )
1634                if( tr_bitfieldHas( peers[j]->have, i ) )
1635                    ++tab[i];
1636        }
1637    }
1638
1639    tr_torrentUnlock( tor );
1640}
1641
1642/* Returns the pieces that are available from peers */
1643tr_bitfield*
1644tr_peerMgrGetAvailable( const tr_torrent * tor )
1645{
1646    int i;
1647    int peerCount;
1648    Torrent * t = tor->torrentPeers;
1649    const tr_peer ** peers;
1650    tr_bitfield * pieces;
1651    managerLock( t->manager );
1652
1653    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1654    peerCount = tr_ptrArraySize( &t->peers );
1655    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1656    for( i=0; i<peerCount; ++i )
1657        tr_bitfieldOr( pieces, peers[i]->have );
1658
1659    managerUnlock( t->manager );
1660    return pieces;
1661}
1662
1663void
1664tr_peerMgrTorrentStats( tr_torrent       * tor,
1665                        int              * setmePeersKnown,
1666                        int              * setmePeersConnected,
1667                        int              * setmeSeedsConnected,
1668                        int              * setmeWebseedsSendingToUs,
1669                        int              * setmePeersSendingToUs,
1670                        int              * setmePeersGettingFromUs,
1671                        int              * setmePeersFrom )
1672{
1673    int i, size;
1674    const Torrent * t = tor->torrentPeers;
1675    const tr_peer ** peers;
1676    const tr_webseed ** webseeds;
1677
1678    managerLock( t->manager );
1679
1680    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1681    size = tr_ptrArraySize( &t->peers );
1682
1683    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1684    *setmePeersConnected       = 0;
1685    *setmeSeedsConnected       = 0;
1686    *setmePeersGettingFromUs   = 0;
1687    *setmePeersSendingToUs     = 0;
1688    *setmeWebseedsSendingToUs  = 0;
1689
1690    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1691        setmePeersFrom[i] = 0;
1692
1693    for( i=0; i<size; ++i )
1694    {
1695        const tr_peer * peer = peers[i];
1696        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1697
1698        if( peer->io == NULL ) /* not connected */
1699            continue;
1700
1701        ++*setmePeersConnected;
1702
1703        ++setmePeersFrom[atom->from];
1704
1705        if( clientIsDownloadingFrom( peer ) )
1706            ++*setmePeersSendingToUs;
1707
1708        if( clientIsUploadingTo( peer ) )
1709            ++*setmePeersGettingFromUs;
1710
1711        if( atom->flags & ADDED_F_SEED_FLAG )
1712            ++*setmeSeedsConnected;
1713    }
1714
1715    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1716    size = tr_ptrArraySize( &t->webseeds );
1717    for( i=0; i<size; ++i )
1718        if( tr_webseedIsActive( webseeds[i] ) )
1719            ++*setmeWebseedsSendingToUs;
1720
1721    managerUnlock( t->manager );
1722}
1723
1724float*
1725tr_peerMgrWebSpeeds( const tr_torrent * tor )
1726{
1727    const Torrent * t = tor->torrentPeers;
1728    const tr_webseed ** webseeds;
1729    int i;
1730    int webseedCount;
1731    float * ret;
1732    uint64_t now;
1733
1734    assert( t->manager );
1735    managerLock( t->manager );
1736
1737    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1738    webseedCount = tr_ptrArraySize( &t->webseeds );
1739    assert( webseedCount == tor->info.webseedCount );
1740    ret = tr_new0( float, webseedCount );
1741    now = tr_date( );
1742
1743    for( i=0; i<webseedCount; ++i )
1744        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1745            ret[i] = -1.0;
1746
1747    managerUnlock( t->manager );
1748    return ret;
1749}
1750
1751double
1752tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1753{
1754    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1755}
1756
1757
1758struct tr_peer_stat *
1759tr_peerMgrPeerStats( const tr_torrent    * tor,
1760                     int                 * setmeCount )
1761{
1762    int i, size;
1763    const Torrent * t = tor->torrentPeers;
1764    const tr_peer ** peers;
1765    tr_peer_stat * ret;
1766    uint64_t now;
1767
1768    assert( t->manager );
1769    managerLock( t->manager );
1770
1771    size = tr_ptrArraySize( &t->peers );
1772    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1773    ret = tr_new0( tr_peer_stat, size );
1774    now = tr_date( );
1775
1776    for( i = 0; i < size; ++i )
1777    {
1778        char *                   pch;
1779        const tr_peer *          peer = peers[i];
1780        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1781        tr_peer_stat *           stat = ret + i;
1782        tr_address               norm_addr;
1783
1784        norm_addr = peer->addr;
1785        tr_normalizeV4Mapped( &norm_addr );
1786        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1787        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1788                   sizeof( stat->client ) );
1789        stat->port               = ntohs( peer->port );
1790        stat->from               = atom->from;
1791        stat->progress           = peer->progress;
1792        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1793        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1794        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1795        stat->peerIsChoked       = peer->peerIsChoked;
1796        stat->peerIsInterested   = peer->peerIsInterested;
1797        stat->clientIsChoked     = peer->clientIsChoked;
1798        stat->clientIsInterested = peer->clientIsInterested;
1799        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1800        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1801        stat->isUploadingTo      = clientIsUploadingTo( peer );
1802        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1803
1804        pch = stat->flagStr;
1805        if( t->optimistic == peer ) *pch++ = 'O';
1806        if( stat->isDownloadingFrom ) *pch++ = 'D';
1807        else if( stat->clientIsInterested ) *pch++ = 'd';
1808        if( stat->isUploadingTo ) *pch++ = 'U';
1809        else if( stat->peerIsInterested ) *pch++ = 'u';
1810        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1811        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1812        if( stat->isEncrypted ) *pch++ = 'E';
1813        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1814        if( stat->isIncoming ) *pch++ = 'I';
1815        *pch = '\0';
1816    }
1817
1818    *setmeCount = size;
1819
1820    managerUnlock( t->manager );
1821    return ret;
1822}
1823
1824/**
1825***
1826**/
1827
1828struct ChokeData
1829{
1830    tr_bool         doUnchoke;
1831    tr_bool         isInterested;
1832    tr_bool         isChoked;
1833    int             rate;
1834    tr_peer *       peer;
1835};
1836
1837static int
1838compareChoke( const void * va,
1839              const void * vb )
1840{
1841    const struct ChokeData * a = va;
1842    const struct ChokeData * b = vb;
1843
1844    if( a->rate != b->rate ) /* prefer higher overall speeds */
1845        return a->rate > b->rate ? -1 : 1;
1846
1847    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1848        return a->isChoked ? 1 : -1;
1849
1850    return 0;
1851}
1852
1853static int
1854isNew( const tr_peer * peer )
1855{
1856    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1857}
1858
1859static int
1860isSame( const tr_peer * peer )
1861{
1862    return peer && peer->client && strstr( peer->client, "Transmission" );
1863}
1864
1865/**
1866***
1867**/
1868
1869static void
1870rechoke( Torrent * t )
1871{
1872    int i, size, unchokedInterested;
1873    const int peerCount = tr_ptrArraySize( &t->peers );
1874    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1875    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1876    const tr_session * session = t->manager->session;
1877    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1878    const uint64_t now = tr_date( );
1879
1880    assert( torrentIsLocked( t ) );
1881
1882    /* sort the peers by preference and rate */
1883    for( i = 0, size = 0; i < peerCount; ++i )
1884    {
1885        tr_peer * peer = peers[i];
1886        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1887
1888        if( peer->progress >= 1.0 ) /* choke all seeds */
1889        {
1890            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1891        }
1892        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1893        {
1894            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1895        }
1896        else if( chokeAll ) /* choke everyone if we're not uploading */
1897        {
1898            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1899        }
1900        else
1901        {
1902            struct ChokeData * n = &choke[size++];
1903            n->peer         = peer;
1904            n->isInterested = peer->peerIsInterested;
1905            n->isChoked     = peer->peerIsChoked;
1906            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1907        }
1908    }
1909
1910    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1911
1912    /**
1913     * Reciprocation and number of uploads capping is managed by unchoking
1914     * the N peers which have the best upload rate and are interested.
1915     * This maximizes the client's download rate. These N peers are
1916     * referred to as downloaders, because they are interested in downloading
1917     * from the client.
1918     *
1919     * Peers which have a better upload rate (as compared to the downloaders)
1920     * but aren't interested get unchoked. If they become interested, the
1921     * downloader with the worst upload rate gets choked. If a client has
1922     * a complete file, it uses its upload rate rather than its download
1923     * rate to decide which peers to unchoke.
1924     */
1925    unchokedInterested = 0;
1926    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1927        choke[i].doUnchoke = 1;
1928        if( choke[i].isInterested )
1929            ++unchokedInterested;
1930    }
1931
1932    /* optimistic unchoke */
1933    if( i < size )
1934    {
1935        int n;
1936        struct ChokeData * c;
1937        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1938
1939        for( ; i<size; ++i )
1940        {
1941            if( choke[i].isInterested )
1942            {
1943                const tr_peer * peer = choke[i].peer;
1944                int x = 1, y;
1945                if( isNew( peer ) ) x *= 3;
1946                if( isSame( peer ) ) x *= 3;
1947                for( y=0; y<x; ++y )
1948                    tr_ptrArrayAppend( &randPool, &choke[i] );
1949            }
1950        }
1951
1952        if(( n = tr_ptrArraySize( &randPool )))
1953        {
1954            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
1955            c->doUnchoke = 1;
1956            t->optimistic = c->peer;
1957        }
1958
1959        tr_ptrArrayDestruct( &randPool, NULL );
1960    }
1961
1962    for( i=0; i<size; ++i )
1963        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1964
1965    /* cleanup */
1966    tr_free( choke );
1967}
1968
1969static int
1970rechokePulse( void * vtorrent )
1971{
1972    Torrent * t = vtorrent;
1973
1974    torrentLock( t );
1975    rechoke( t );
1976    torrentUnlock( t );
1977    return TRUE;
1978}
1979
1980/***
1981****
1982****  Life and Death
1983****
1984***/
1985
1986static int
1987shouldPeerBeClosed( const Torrent * t,
1988                    const tr_peer * peer,
1989                    int             peerCount )
1990{
1991    const tr_torrent *       tor = t->tor;
1992    const time_t             now = time( NULL );
1993    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1994
1995    /* if it's marked for purging, close it */
1996    if( peer->doPurge )
1997    {
1998        tordbg( t, "purging peer %s because its doPurge flag is set",
1999                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2000        return TRUE;
2001    }
2002
2003    /* if we're seeding and the peer has everything we have,
2004     * and enough time has passed for a pex exchange, then disconnect */
2005    if( tr_torrentIsSeed( tor ) )
2006    {
2007        int peerHasEverything;
2008        if( atom->flags & ADDED_F_SEED_FLAG )
2009            peerHasEverything = TRUE;
2010        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2011            peerHasEverything = FALSE;
2012        else {
2013            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2014            tr_bitfieldDifference( tmp, peer->have );
2015            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2016            tr_bitfieldFree( tmp );
2017        }
2018
2019        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2020        {
2021            tordbg( t, "purging peer %s because we're both seeds",
2022                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2023            return TRUE;
2024        }
2025    }
2026
2027    /* disconnect if it's been too long since piece data has been transferred.
2028     * this is on a sliding scale based on number of available peers... */
2029    {
2030        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2031        /* if we have >= relaxIfFewerThan, strictness is 100%.
2032         * if we have zero connections, strictness is 0% */
2033        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2034                               ? 1.0
2035                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2036        const int lo = MIN_UPLOAD_IDLE_SECS;
2037        const int hi = MAX_UPLOAD_IDLE_SECS;
2038        const int limit = hi - ( ( hi - lo ) * strictness );
2039        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2040/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2041        if( idleTime > limit ) {
2042            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2043                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2044            return TRUE;
2045        }
2046    }
2047
2048    return FALSE;
2049}
2050
2051static tr_peer **
2052getPeersToClose( Torrent * t, int * setmeSize )
2053{
2054    int i, peerCount, outsize;
2055    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2056    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2057
2058    assert( torrentIsLocked( t ) );
2059
2060    for( i = outsize = 0; i < peerCount; ++i )
2061        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2062            ret[outsize++] = peers[i];
2063
2064    *setmeSize = outsize;
2065    return ret;
2066}
2067
2068static int
2069compareCandidates( const void * va,
2070                   const void * vb )
2071{
2072    const struct peer_atom * a = *(const struct peer_atom**) va;
2073    const struct peer_atom * b = *(const struct peer_atom**) vb;
2074
2075    /* <Charles> Here we would probably want to try reconnecting to
2076     * peers that had most recently given us data. Lots of users have
2077     * trouble with resets due to their routers and/or ISPs. This way we
2078     * can quickly recover from an unwanted reset. So we sort
2079     * piece_data_time in descending order.
2080     */
2081
2082    if( a->piece_data_time != b->piece_data_time )
2083        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2084
2085    if( a->numFails != b->numFails )
2086        return a->numFails < b->numFails ? -1 : 1;
2087
2088    if( a->time != b->time )
2089        return a->time < b->time ? -1 : 1;
2090
2091    /* all other things being equal, prefer peers whose
2092     * information comes from a more reliable source */
2093    if( a->from != b->from )
2094        return a->from < b->from ? -1 : 1;
2095
2096    return 0;
2097}
2098
2099static int
2100getReconnectIntervalSecs( const struct peer_atom * atom )
2101{
2102    int          sec;
2103    const time_t now = time( NULL );
2104
2105    /* if we were recently connected to this peer and transferring piece
2106     * data, try to reconnect to them sooner rather that later -- we don't
2107     * want network troubles to get in the way of a good peer. */
2108    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2109        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2110
2111    /* don't allow reconnects more often than our minimum */
2112    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2113        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2114
2115    /* otherwise, the interval depends on how many times we've tried
2116     * and failed to connect to the peer */
2117    else switch( atom->numFails ) {
2118        case 0: sec = 0; break;
2119        case 1: sec = 5; break;
2120        case 2: sec = 2 * 60; break;
2121        case 3: sec = 15 * 60; break;
2122        case 4: sec = 30 * 60; break;
2123        case 5: sec = 60 * 60; break;
2124        default: sec = 120 * 60; break;
2125    }
2126
2127    return sec;
2128}
2129
2130static struct peer_atom **
2131getPeerCandidates( Torrent * t, int * setmeSize )
2132{
2133    int                 i, atomCount, retCount;
2134    struct peer_atom ** atoms;
2135    struct peer_atom ** ret;
2136    const time_t        now = time( NULL );
2137    const int           seed = tr_torrentIsSeed( t->tor );
2138
2139    assert( torrentIsLocked( t ) );
2140
2141    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2142    ret = tr_new( struct peer_atom*, atomCount );
2143    for( i = retCount = 0; i < atomCount; ++i )
2144    {
2145        int                interval;
2146        struct peer_atom * atom = atoms[i];
2147
2148        /* peer fed us too much bad data ... we only keep it around
2149         * now to weed it out in case someone sends it to us via pex */
2150        if( atom->myflags & MYFLAG_BANNED )
2151            continue;
2152
2153        /* peer was unconnectable before, so we're not going to keep trying.
2154         * this is needs a separate flag from `banned', since if they try
2155         * to connect to us later, we'll let them in */
2156        if( atom->myflags & MYFLAG_UNREACHABLE )
2157            continue;
2158
2159        /* we don't need two connections to the same peer... */
2160        if( peerIsInUse( t, &atom->addr ) )
2161            continue;
2162
2163        /* no need to connect if we're both seeds... */
2164        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2165                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2166            continue;
2167
2168        /* don't reconnect too often */
2169        interval = getReconnectIntervalSecs( atom );
2170        if( ( now - atom->time ) < interval )
2171        {
2172            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2173                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2174            continue;
2175        }
2176
2177        /* Don't connect to peers in our blocklist */
2178        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2179            continue;
2180
2181        ret[retCount++] = atom;
2182    }
2183
2184    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2185    *setmeSize = retCount;
2186    return ret;
2187}
2188
2189static int
2190reconnectPulse( void * vtorrent )
2191{
2192    Torrent *     t = vtorrent;
2193    static time_t prevTime = 0;
2194    static int    newConnectionsThisSecond = 0;
2195    time_t        now;
2196
2197    torrentLock( t );
2198
2199    now = time( NULL );
2200    if( prevTime != now )
2201    {
2202        prevTime = now;
2203        newConnectionsThisSecond = 0;
2204    }
2205
2206    if( !t->isRunning )
2207    {
2208        removeAllPeers( t );
2209    }
2210    else
2211    {
2212        int i, nCandidates, nBad;
2213        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2214        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2215        int maxCandidates;
2216
2217        maxCandidates = nCandidates;
2218        maxCandidates = MAX( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2219        maxCandidates = MAX( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2220        maxCandidates = MAX( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2221
2222        //if( nBad || nCandidates )
2223            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2224                    "%d connection candidates, %d atoms, max per pulse is %d",
2225                    t->tor->info.name, nBad, nCandidates,
2226                    tr_ptrArraySize( &t->pool ),
2227                    (int)MAX_RECONNECTIONS_PER_PULSE );
2228
2229        /* disconnect some peers to make room for better ones.
2230           if we transferred piece data, then they might be good peers,
2231           so reset their `numFails' weight to zero.  otherwise we connected
2232           to them fruitlessly, so mark it as another fail */
2233        for( i = 0; ( i < nBad ) && ( i < maxCandidates ) ; ++i ) {
2234            tr_peer * peer = connections[i];
2235            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2236            if( atom->piece_data_time )
2237                atom->numFails = 0;
2238            else
2239                ++atom->numFails;
2240            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2241            removePeer( t, peer );
2242        }
2243
2244tordbg( t, "nCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, getPeerCount(t) is %d, getMaxPeerCount(t) is %d, newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2245        (int)nCandidates, (int)MAX_RECONNECTIONS_PER_PULSE, (int)getPeerCount( t ), (int)getMaxPeerCount( t->tor ), (int)newConnectionsThisSecond, (int)MAX_CONNECTIONS_PER_SECOND );
2246
2247        /* add some new ones */
2248        for( i = 0;    ( i < nCandidates )
2249           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2250           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2251           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2252        {
2253            tr_peerMgr *       mgr = t->manager;
2254            struct peer_atom * atom = candidates[i];
2255            tr_peerIo *        io;
2256
2257            tordbg( t, "Starting an OUTGOING connection with %s",
2258                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2259
2260            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2261
2262            if( io == NULL )
2263            {
2264                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2265                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2266                atom->myflags |= MYFLAG_UNREACHABLE;
2267            }
2268            else
2269            {
2270                tr_handshake * handshake = tr_handshakeNew( io,
2271                                                            mgr->session->encryptionMode,
2272                                                            myHandshakeDoneCB,
2273                                                            mgr );
2274
2275                assert( tr_peerIoGetTorrentHash( io ) );
2276
2277                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2278
2279                ++newConnectionsThisSecond;
2280
2281                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2282                                         handshakeCompare );
2283            }
2284
2285            atom->time = time( NULL );
2286        }
2287
2288        /* cleanup */
2289        tr_free( connections );
2290        tr_free( candidates );
2291    }
2292
2293    torrentUnlock( t );
2294    return TRUE;
2295}
2296
2297/****
2298*****
2299*****  BANDWIDTH ALLOCATION
2300*****
2301****/
2302
2303static void
2304pumpAllPeers( tr_peerMgr * mgr )
2305{
2306    tr_torrent * tor = NULL;
2307
2308    while(( tor = tr_torrentNext( mgr->session, tor )))
2309    {
2310        int j;
2311        Torrent * t = tor->torrentPeers;
2312
2313        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2314        {
2315            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2316            tr_peerMsgsPulse( peer->msgs );
2317        }
2318    }
2319}
2320
2321static int
2322bandwidthPulse( void * vmgr )
2323{
2324    tr_peerMgr * mgr = vmgr;
2325    managerLock( mgr );
2326
2327    /* FIXME: this next line probably isn't necessary... */
2328    pumpAllPeers( mgr );
2329
2330    /* allocate bandwidth to the peers */
2331    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2332    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2333
2334    managerUnlock( mgr );
2335    return TRUE;
2336}
Note: See TracBrowser for help on using the repository browser.