source: branches/1.5x/libtransmission/peer-mgr.c @ 7767

Last change on this file since 7767 was 7767, checked in by charles, 12 years ago

(1.5x) backport trunk changes to 1.5x branch

  • Property svn:keywords set to Date Rev Author Id
File size: 67.7 KB
Line 
1
2/*
3 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
4 *
5 * This file is licensed by the GPL version 2.  Works owned by the
6 * Transmission project are granted a special exemption to clause 2(b)
7 * so that the bulk of its code can remain under the MIT license.
8 * This exemption does not extend to derived works not owned by
9 * the Transmission project.
10 *
11 * $Id: peer-mgr.c 7767 2009-01-20 18:31:37Z charles $
12 */
13
14#include <assert.h>
15#include <errno.h>
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18#include <limits.h> /* INT_MAX */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "fdlimit.h"
31#include "handshake.h"
32#include "inout.h" /* tr_ioTestPiece */
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to change which peers are choked */
47    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
48
49    /* minimum interval for refilling peers' request lists */
50    REFILL_PERIOD_MSEC = 333,
51
52    /* when many peers are available, keep idle ones this long */
53    MIN_UPLOAD_IDLE_SECS = ( 30 ),
54
55    /* when few peers are available, keep idle ones this long */
56    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
57
58    /* how frequently to decide which peers live and die */
59    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
60   
61    /* how frequently to reallocate bandwidth */
62    BANDWIDTH_PERIOD_MSEC = 500,
63
64    /* max # of peers to ask fer per torrent per reconnect pulse */
65    MAX_RECONNECTIONS_PER_PULSE = 16,
66
67    /* max number of peers to ask for per second overall.
68    * this throttle is to avoid overloading the router */
69    MAX_CONNECTIONS_PER_SECOND = 32,
70
71    /* number of bad pieces a peer is allowed to send before we ban them */
72    MAX_BAD_PIECES_PER_PEER = 5,
73
74    /* use for bitwise operations w/peer_atom.myflags */
75    MYFLAG_BANNED = 1,
76
77    /* unreachable for now... but not banned.
78     * if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2,
80
81    /* the minimum we'll wait before attempting to reconnect to a peer */
82    MINIMUM_RECONNECT_INTERVAL_SECS = 5
83};
84
85
86/**
87***
88**/
89
90enum
91{
92    UPLOAD_ONLY_UKNOWN,
93    UPLOAD_ONLY_YES,
94    UPLOAD_ONLY_NO
95};
96
97/**
98 * Peer information that should be kept even before we've connected and
99 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
100 * which ones would make good candidates for connecting to, and to watch out
101 * for banned peers.
102 *
103 * @see tr_peer
104 * @see tr_peermsgs
105 */
106struct peer_atom
107{
108    uint8_t     from;
109    uint8_t     flags;       /* these match the added_f flags */
110    uint8_t     myflags;     /* flags that aren't defined in added_f */
111    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
112    tr_port     port;
113    uint16_t    numFails;
114    tr_address  addr;
115    time_t      time;        /* when the peer's connection status last changed */
116    time_t      piece_data_time;
117};
118
119typedef struct tr_torrent_peers
120{
121    tr_bool         isRunning;
122
123    uint8_t         hash[SHA_DIGEST_LENGTH];
124    int         *   pendingRequestCount;
125    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
126    tr_ptrArray     pool; /* struct peer_atom */
127    tr_ptrArray     peers; /* tr_peer */
128    tr_ptrArray     webseeds; /* tr_webseed */
129    tr_timer *      reconnectTimer;
130    tr_timer *      rechokeTimer;
131    tr_timer *      refillTimer;
132    tr_torrent *    tor;
133    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
134
135    struct tr_peerMgr * manager;
136}
137Torrent;
138
139struct tr_peerMgr
140{
141    tr_session      * session;
142    tr_ptrArray       incomingHandshakes; /* tr_handshake */
143    tr_timer        * bandwidthTimer;
144};
145
146#define tordbg( t, ... ) \
147    do { \
148        if( tr_deepLoggingIsActive( ) ) \
149            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
150    } while( 0 )
151
152#define dbgmsg( ... ) \
153    do { \
154        if( tr_deepLoggingIsActive( ) ) \
155            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
156    } while( 0 )
157
158/**
159***
160**/
161
162static TR_INLINE void
163managerLock( const struct tr_peerMgr * manager )
164{
165    tr_globalLock( manager->session );
166}
167
168static TR_INLINE void
169managerUnlock( const struct tr_peerMgr * manager )
170{
171    tr_globalUnlock( manager->session );
172}
173
174static TR_INLINE void
175torrentLock( Torrent * torrent )
176{
177    managerLock( torrent->manager );
178}
179
180static TR_INLINE void
181torrentUnlock( Torrent * torrent )
182{
183    managerUnlock( torrent->manager );
184}
185
186static TR_INLINE int
187torrentIsLocked( const Torrent * t )
188{
189    return tr_globalIsLocked( t->manager->session );
190}
191
192/**
193***
194**/
195
196static int
197handshakeCompareToAddr( const void * va, const void * vb )
198{
199    const tr_handshake * a = va;
200
201    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
202}
203
204static int
205handshakeCompare( const void * a, const void * b )
206{
207    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
208}
209
210static tr_handshake*
211getExistingHandshake( tr_ptrArray      * handshakes,
212                      const tr_address * addr )
213{
214    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
215}
216
217static int
218comparePeerAtomToAddress( const void * va, const void * vb )
219{
220    const struct peer_atom * a = va;
221
222    return tr_compareAddresses( &a->addr, vb );
223}
224
225static int
226comparePeerAtoms( const void * va, const void * vb )
227{
228    const struct peer_atom * b = vb;
229
230    return comparePeerAtomToAddress( va, &b->addr );
231}
232
233/**
234***
235**/
236
237static Torrent*
238getExistingTorrent( tr_peerMgr *    manager,
239                    const uint8_t * hash )
240{
241    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
242
243    return tor == NULL ? NULL : tor->torrentPeers;
244}
245
246static int
247peerCompare( const void * va, const void * vb )
248{
249    const tr_peer * a = va;
250    const tr_peer * b = vb;
251
252    return tr_compareAddresses( &a->addr, &b->addr );
253}
254
255static int
256peerCompareToAddr( const void * va, const void * vb )
257{
258    const tr_peer * a = va;
259
260    return tr_compareAddresses( &a->addr, vb );
261}
262
263static tr_peer*
264getExistingPeer( Torrent          * torrent,
265                 const tr_address * addr )
266{
267    assert( torrentIsLocked( torrent ) );
268    assert( addr );
269
270    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
271}
272
273static struct peer_atom*
274getExistingAtom( const Torrent    * t,
275                 const tr_address * addr )
276{
277    Torrent * tt = (Torrent*)t;
278    assert( torrentIsLocked( t ) );
279    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
280}
281
282static tr_bool
283peerIsInUse( const Torrent    * ct,
284             const tr_address * addr )
285{
286    Torrent * t = (Torrent*) ct;
287
288    assert( torrentIsLocked ( t ) );
289
290    return getExistingPeer( t, addr )
291        || getExistingHandshake( &t->outgoingHandshakes, addr )
292        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
293}
294
295static tr_peer*
296peerConstructor( const tr_address * addr )
297{
298    tr_peer * p;
299    p = tr_new0( tr_peer, 1 );
300    p->addr = *addr;
301    return p;
302}
303
304static tr_peer*
305getPeer( Torrent          * torrent,
306         const tr_address * addr )
307{
308    tr_peer * peer;
309
310    assert( torrentIsLocked( torrent ) );
311
312    peer = getExistingPeer( torrent, addr );
313
314    if( peer == NULL )
315    {
316        peer = peerConstructor( addr );
317        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
318    }
319
320    return peer;
321}
322
323static void
324peerDestructor( tr_peer * peer )
325{
326    assert( peer );
327
328    if( peer->msgs != NULL )
329    {
330        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
331        tr_peerMsgsFree( peer->msgs );
332    }
333
334    tr_peerIoClear( peer->io );
335    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
336
337    tr_bitfieldFree( peer->have );
338    tr_bitfieldFree( peer->blame );
339    tr_free( peer->client );
340
341    tr_free( peer );
342}
343
344static void
345removePeer( Torrent * t,
346            tr_peer * peer )
347{
348    tr_peer *          removed;
349    struct peer_atom * atom;
350
351    assert( torrentIsLocked( t ) );
352
353    atom = getExistingAtom( t, &peer->addr );
354    assert( atom );
355    atom->time = time( NULL );
356
357    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
358    assert( removed == peer );
359    peerDestructor( removed );
360}
361
362static void
363removeAllPeers( Torrent * t )
364{
365    while( !tr_ptrArrayEmpty( &t->peers ) )
366        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
367}
368
369static void
370torrentDestructor( void * vt )
371{
372    Torrent * t = vt;
373    uint8_t   hash[SHA_DIGEST_LENGTH];
374
375    assert( t );
376    assert( !t->isRunning );
377    assert( torrentIsLocked( t ) );
378    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
379    assert( tr_ptrArrayEmpty( &t->peers ) );
380
381    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
382
383    tr_timerFree( &t->reconnectTimer );
384    tr_timerFree( &t->rechokeTimer );
385    tr_timerFree( &t->refillTimer );
386
387    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
388    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
389    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
390    tr_ptrArrayDestruct( &t->peers, NULL );
391
392    tr_free( t->pendingRequestCount );
393    tr_free( t );
394}
395
396static void peerCallbackFunc( void * vpeer,
397                              void * vevent,
398                              void * vt );
399
400static Torrent*
401torrentConstructor( tr_peerMgr * manager,
402                    tr_torrent * tor )
403{
404    int       i;
405    Torrent * t;
406
407    t = tr_new0( Torrent, 1 );
408    t->manager = manager;
409    t->tor = tor;
410    t->pool = TR_PTR_ARRAY_INIT;
411    t->peers = TR_PTR_ARRAY_INIT;
412    t->webseeds = TR_PTR_ARRAY_INIT;
413    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
414    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
415
416    for( i = 0; i < tor->info.webseedCount; ++i )
417    {
418        tr_webseed * w =
419            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
420        tr_ptrArrayAppend( &t->webseeds, w );
421    }
422
423    return t;
424}
425
426
427static int bandwidthPulse( void * vmgr );
428
429
430tr_peerMgr*
431tr_peerMgrNew( tr_session * session )
432{
433    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
434
435    m->session = session;
436    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
437    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
438    return m;
439}
440
441void
442tr_peerMgrFree( tr_peerMgr * manager )
443{
444    managerLock( manager );
445
446    tr_timerFree( &manager->bandwidthTimer );
447
448    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
449     * the item from manager->handshakes, so this is a little roundabout... */
450    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
451        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
452
453    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
454
455    managerUnlock( manager );
456    tr_free( manager );
457}
458
459static int
460clientIsDownloadingFrom( const tr_peer * peer )
461{
462    return peer->clientIsInterested && !peer->clientIsChoked;
463}
464
465static int
466clientIsUploadingTo( const tr_peer * peer )
467{
468    return peer->peerIsInterested && !peer->peerIsChoked;
469}
470
471/***
472****
473***/
474
475tr_bool
476tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
477                      const tr_address  * addr )
478{
479    tr_bool isSeed = FALSE;
480    const Torrent * t = tor->torrentPeers;
481    const struct peer_atom * atom = getExistingAtom( t, addr );
482
483    if( atom )
484        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
485
486    return isSeed;
487}
488
489/****
490*****
491*****  REFILL
492*****
493****/
494
495static void
496assertValidPiece( Torrent * t, tr_piece_index_t piece )
497{
498    assert( t );
499    assert( t->tor );
500    assert( piece < t->tor->info.pieceCount );
501}
502
503static int
504getPieceRequests( Torrent * t, tr_piece_index_t piece )
505{
506    assertValidPiece( t, piece );
507
508    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
509}
510
511static void
512incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
513{
514    assertValidPiece( t, piece );
515
516    if( t->pendingRequestCount == NULL )
517        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
518    t->pendingRequestCount[piece]++;
519}
520
521static void
522decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
523{
524    assertValidPiece( t, piece );
525
526    if( t->pendingRequestCount )
527        t->pendingRequestCount[piece]--;
528}
529
530struct tr_refill_piece
531{
532    tr_priority_t    priority;
533    uint32_t         piece;
534    uint32_t         peerCount;
535    int              random;
536    int              pendingRequestCount;
537    int              missingBlockCount;
538};
539
540static int
541compareRefillPiece( const void * aIn, const void * bIn )
542{
543    const struct tr_refill_piece * a = aIn;
544    const struct tr_refill_piece * b = bIn;
545
546    /* if one piece has a higher priority, it goes first */
547    if( a->priority != b->priority )
548        return a->priority > b->priority ? -1 : 1;
549
550    /* have a per-priority endgame */
551    if( a->pendingRequestCount != b->pendingRequestCount )
552        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
553
554    /* fewer missing pieces goes first */
555    if( a->missingBlockCount != b->missingBlockCount )
556        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
557
558    /* otherwise if one has fewer peers, it goes first */
559    if( a->peerCount != b->peerCount )
560        return a->peerCount < b->peerCount ? -1 : 1;
561
562    /* otherwise go with our random seed */
563    if( a->random != b->random )
564        return a->random < b->random ? -1 : 1;
565
566    return 0;
567}
568
569static tr_piece_index_t *
570getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
571{
572    const tr_torrent  * tor = t->tor;
573    const tr_info     * inf = &tor->info;
574    tr_piece_index_t    i;
575    tr_piece_index_t    poolSize = 0;
576    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
577    int                 peerCount;
578    const tr_peer    ** peers;
579
580    assert( torrentIsLocked( t ) );
581
582    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
583    peerCount = tr_ptrArraySize( &t->peers );
584
585    /* make a list of the pieces that we want but don't have */
586    for( i = 0; i < inf->pieceCount; ++i )
587        if( !tor->info.pieces[i].dnd
588                && !tr_cpPieceIsComplete( &tor->completion, i ) )
589            pool[poolSize++] = i;
590
591    /* sort the pool by which to request next */
592    if( poolSize > 1 )
593    {
594        tr_piece_index_t j;
595        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
596
597        for( j = 0; j < poolSize; ++j )
598        {
599            int k;
600            const tr_piece_index_t piece = pool[j];
601            struct tr_refill_piece * setme = p + j;
602
603            setme->piece = piece;
604            setme->priority = inf->pieces[piece].priority;
605            setme->peerCount = 0;
606            setme->random = tr_cryptoWeakRandInt( INT_MAX );
607            setme->pendingRequestCount = getPieceRequests( t, piece );
608            setme->missingBlockCount
609                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
610
611            for( k = 0; k < peerCount; ++k )
612            {
613                const tr_peer * peer = peers[k];
614                if( peer->peerIsInterested
615                        && !peer->clientIsChoked
616                        && tr_bitfieldHas( peer->have, piece ) )
617                    ++setme->peerCount;
618            }
619        }
620
621        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
622               compareRefillPiece );
623
624        for( j = 0; j < poolSize; ++j )
625            pool[j] = p[j].piece;
626
627        tr_free( p );
628    }
629
630    *pieceCount = poolSize;
631    return pool;
632}
633
634struct tr_blockIterator
635{
636    Torrent * t;
637    tr_block_index_t blockIndex, blockCount, *blocks;
638    tr_piece_index_t pieceIndex, pieceCount, *pieces;
639};
640
641static struct tr_blockIterator*
642blockIteratorNew( Torrent * t )
643{
644    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
645    i->t = t;
646    i->pieces = getPreferredPieces( t, &i->pieceCount );
647    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
648    return i;
649}
650
651static int
652blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
653{
654    int found;
655    Torrent * t = i->t;
656    tr_torrent * tor = t->tor;
657
658    while( ( i->blockIndex == i->blockCount )
659        && ( i->pieceIndex < i->pieceCount ) )
660    {
661        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
662        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
663        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
664        tr_block_index_t block;
665
666        assert( index < tor->info.pieceCount );
667
668        i->blockCount = 0;
669        i->blockIndex = 0;
670        for( block=b; block!=e; ++block )
671            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
672                i->blocks[i->blockCount++] = block;
673    }
674
675    assert( i->blockCount <= tor->blockCountInPiece );
676
677    if(( found = ( i->blockIndex < i->blockCount )))
678        *setme = i->blocks[i->blockIndex++];
679
680    return found;
681}
682
683static void
684blockIteratorFree( struct tr_blockIterator * i )
685{
686    tr_free( i->blocks );
687    tr_free( i->pieces );
688    tr_free( i );
689}
690
691static tr_peer**
692getPeersUploadingToClient( Torrent * t,
693                           int *     setmeCount )
694{
695    int j;
696    int peerCount = 0;
697    int retCount = 0;
698    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
699    tr_peer ** ret = tr_new( tr_peer *, peerCount );
700
701    j = 0; /* this is a temporary test to make sure we walk through all the peers */
702    if( peerCount )
703    {
704        /* Get a list of peers we're downloading from.
705           Pick a different starting point each time so all peers
706           get a chance at being the first in line */
707        const int fencepost = tr_cryptoWeakRandInt( peerCount );
708        int i = fencepost;
709        do {
710            if( clientIsDownloadingFrom( peers[i] ) )
711                ret[retCount++] = peers[i];
712            i = ( i + 1 ) % peerCount;
713            ++j;
714        } while( i != fencepost );
715    }
716    assert( j == peerCount );
717    *setmeCount = retCount;
718    return ret;
719}
720
721static uint32_t
722getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
723{
724    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
725    const uint64_t blockPos = tor->blockSize * b;
726    assert( blockPos >= piecePos );
727    return (uint32_t)( blockPos - piecePos );
728}
729
730static int
731refillPulse( void * vtorrent )
732{
733    tr_block_index_t block;
734    int peerCount;
735    int webseedCount;
736    tr_peer ** peers;
737    tr_webseed ** webseeds;
738    struct tr_blockIterator * blockIterator;
739    Torrent * t = vtorrent;
740    tr_torrent * tor = t->tor;
741
742    if( !t->isRunning )
743        return TRUE;
744    if( tr_torrentIsSeed( t->tor ) )
745        return TRUE;
746
747    torrentLock( t );
748    tordbg( t, "Refilling Request Buffers..." );
749
750    blockIterator = blockIteratorNew( t );
751    peers = getPeersUploadingToClient( t, &peerCount );
752    webseedCount = tr_ptrArraySize( &t->webseeds );
753    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
754                          webseedCount * sizeof( tr_webseed* ) );
755
756    while( ( webseedCount || peerCount )
757        && blockIteratorNext( blockIterator, &block ) )
758    {
759        int j;
760        int handled = FALSE;
761
762        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
763        const uint32_t offset = getBlockOffsetInPiece( tor, block );
764        const uint32_t length = tr_torBlockCountBytes( tor, block );
765
766        /* find a peer who can ask for this block */
767        for( j=0; !handled && j<peerCount; )
768        {
769            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
770            switch( val )
771            {
772                case TR_ADDREQ_FULL:
773                case TR_ADDREQ_CLIENT_CHOKED:
774                    peers[j] = peers[--peerCount];
775                    break;
776
777                case TR_ADDREQ_MISSING:
778                case TR_ADDREQ_DUPLICATE:
779                    ++j;
780                    break;
781
782                case TR_ADDREQ_OK:
783                    incrementPieceRequests( t, index );
784                    handled = TRUE;
785                    break;
786
787                default:
788                    assert( 0 && "unhandled value" );
789                    break;
790            }
791        }
792
793        /* maybe one of the webseeds can do it */
794        for( j=0; !handled && j<webseedCount; )
795        {
796            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
797            switch( val )
798            {
799                case TR_ADDREQ_FULL:
800                    webseeds[j] = webseeds[--webseedCount];
801                    break;
802
803                case TR_ADDREQ_OK:
804                    incrementPieceRequests( t, index );
805                    handled = TRUE;
806                    break;
807
808                default:
809                    assert( 0 && "unhandled value" );
810                    break;
811            }
812        }
813    }
814
815    /* cleanup */
816    blockIteratorFree( blockIterator );
817    tr_free( webseeds );
818    tr_free( peers );
819
820    t->refillTimer = NULL;
821    torrentUnlock( t );
822    return FALSE;
823}
824
825static void
826broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
827{
828    size_t i;
829    size_t peerCount;
830    tr_peer ** peers;
831
832    assert( torrentIsLocked( t ) );
833
834    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
835
836    peerCount = tr_ptrArraySize( &t->peers );
837    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
838    for( i=0; i<peerCount; ++i )
839        if( peers[i]->msgs )
840            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
841}
842
843static void
844addStrike( Torrent * t,
845           tr_peer * peer )
846{
847    tordbg( t, "increasing peer %s strike count to %d",
848            tr_peerIoAddrStr( &peer->addr,
849                              peer->port ), peer->strikes + 1 );
850
851    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
852    {
853        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
854        atom->myflags |= MYFLAG_BANNED;
855        peer->doPurge = 1;
856        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
857    }
858}
859
860static void
861gotBadPiece( Torrent *        t,
862             tr_piece_index_t pieceIndex )
863{
864    tr_torrent *   tor = t->tor;
865    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
866
867    tor->corruptCur += byteCount;
868    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
869}
870
871static void
872refillSoon( Torrent * t )
873{
874    if( t->refillTimer == NULL )
875        t->refillTimer = tr_timerNew( t->manager->session,
876                                      refillPulse, t,
877                                      REFILL_PERIOD_MSEC );
878}
879
880static void
881peerSuggestedPiece( Torrent            * t UNUSED,
882                    tr_peer            * peer UNUSED,
883                    tr_piece_index_t     pieceIndex UNUSED,
884                    int                  isFastAllowed UNUSED )
885{
886#if 0
887    assert( t );
888    assert( peer );
889    assert( peer->msgs );
890
891    /* is this a valid piece? */
892    if(  pieceIndex >= t->tor->info.pieceCount )
893        return;
894
895    /* don't ask for it if we've already got it */
896    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
897        return;
898
899    /* don't ask for it if they don't have it */
900    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
901        return;
902
903    /* don't ask for it if we're choked and it's not fast */
904    if( !isFastAllowed && peer->clientIsChoked )
905        return;
906
907    /* request the blocks that we don't have in this piece */
908    {
909        tr_block_index_t block;
910        const tr_torrent * tor = t->tor;
911        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
912        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
913
914        for( block=start; block<end; ++block )
915        {
916            if( !tr_cpBlockIsComplete( tor->completion, block ) )
917            {
918                const uint32_t offset = getBlockOffsetInPiece( tor, block );
919                const uint32_t length = tr_torBlockCountBytes( tor, block );
920                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
921                incrementPieceRequests( t, pieceIndex );
922            }
923        }
924    }
925#endif
926}
927
928static void
929peerCallbackFunc( void * vpeer, void * vevent, void * vt )
930{
931    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
932    Torrent * t = vt;
933    const tr_peer_event * e = vevent;
934
935    torrentLock( t );
936
937    switch( e->eventType )
938    {
939        case TR_PEER_UPLOAD_ONLY:
940            /* update our atom */
941            if( peer ) {
942                struct peer_atom * a = getExistingAtom( t, &peer->addr );
943                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
944            }
945            break;
946
947        case TR_PEER_NEED_REQ:
948            refillSoon( t );
949            break;
950
951        case TR_PEER_CANCEL:
952            decrementPieceRequests( t, e->pieceIndex );
953            break;
954
955        case TR_PEER_PEER_GOT_DATA:
956        {
957            const time_t now = time( NULL );
958            tr_torrent * tor = t->tor;
959
960            tor->activityDate = now;
961
962            if( e->wasPieceData )
963                tor->uploadedCur += e->length;
964
965            /* update the stats */
966            if( e->wasPieceData )
967                tr_statsAddUploaded( tor->session, e->length );
968
969            /* update our atom */
970            if( peer ) {
971                struct peer_atom * a = getExistingAtom( t, &peer->addr );
972                if( e->wasPieceData )
973                    a->piece_data_time = now;
974            }
975
976            break;
977        }
978
979        case TR_PEER_CLIENT_GOT_SUGGEST:
980            if( peer )
981                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
982            break;
983
984        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
985            if( peer )
986                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
987            break;
988
989        case TR_PEER_CLIENT_GOT_DATA:
990        {
991            const time_t now = time( NULL );
992            tr_torrent * tor = t->tor;
993            tor->activityDate = now;
994
995            /* only add this to downloadedCur if we got it from a peer --
996             * webseeds shouldn't count against our ratio.  As one tracker
997             * admin put it, "Those pieces are downloaded directly from the
998             * content distributor, not the peers, it is the tracker's job
999             * to manage the swarms, not the web server and does not fit
1000             * into the jurisdiction of the tracker." */
1001            if( peer && e->wasPieceData )
1002                tor->downloadedCur += e->length;
1003
1004            /* update the stats */ 
1005            if( e->wasPieceData )
1006                tr_statsAddDownloaded( tor->session, e->length );
1007
1008            /* update our atom */
1009            if( peer ) {
1010                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1011                if( e->wasPieceData )
1012                    a->piece_data_time = now;
1013            }
1014
1015            break;
1016        }
1017
1018        case TR_PEER_PEER_PROGRESS:
1019        {
1020            if( peer )
1021            {
1022                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1023                const int peerIsSeed = e->progress >= 1.0;
1024                if( peerIsSeed ) {
1025                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1026                    atom->flags |= ADDED_F_SEED_FLAG;
1027                } else {
1028                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1029                    atom->flags &= ~ADDED_F_SEED_FLAG;
1030                }
1031            }
1032            break;
1033        }
1034
1035        case TR_PEER_CLIENT_GOT_BLOCK:
1036        {
1037            tr_torrent * tor = t->tor;
1038
1039            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1040
1041            tr_cpBlockAdd( &tor->completion, block );
1042            decrementPieceRequests( t, e->pieceIndex );
1043
1044            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1045
1046            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1047            {
1048                const tr_piece_index_t p = e->pieceIndex;
1049                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1050
1051                if( !ok )
1052                {
1053                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1054                               (unsigned long)p );
1055                }
1056
1057                tr_torrentSetHasPiece( tor, p, ok );
1058                tr_torrentSetPieceChecked( tor, p, TRUE );
1059                tr_peerMgrSetBlame( tor, p, ok );
1060
1061                if( !ok )
1062                {
1063                    gotBadPiece( t, p );
1064                }
1065                else
1066                {
1067                    int i;
1068                    int peerCount;
1069                    tr_peer ** peers;
1070                    tr_file_index_t fileIndex;
1071
1072                    peerCount = tr_ptrArraySize( &t->peers );
1073                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1074                    for( i=0; i<peerCount; ++i )
1075                        tr_peerMsgsHave( peers[i]->msgs, p );
1076
1077                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1078                    {
1079                        const tr_file * file = &tor->info.files[fileIndex];
1080                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1081                        {
1082                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1083                            tordbg( t, "closing recently-completed file \"%s\"", path );
1084                            tr_fdFileClose( path );
1085                            tr_free( path );
1086                        }
1087                    }
1088                }
1089
1090                tr_torrentRecheckCompleteness( tor );
1091            }
1092            break;
1093        }
1094
1095        case TR_PEER_ERROR:
1096            if( e->err == EINVAL )
1097            {
1098                addStrike( t, peer );
1099                peer->doPurge = 1;
1100                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1101            }
1102            else if( ( e->err == ERANGE )
1103                  || ( e->err == EMSGSIZE )
1104                  || ( e->err == ENOTCONN ) )
1105            {
1106                /* some protocol error from the peer */
1107                peer->doPurge = 1;
1108                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1109            }
1110            else /* a local error, such as an IO error */
1111            {
1112                t->tor->error = e->err;
1113                tr_strlcpy( t->tor->errorString,
1114                            tr_strerror( t->tor->error ),
1115                            sizeof( t->tor->errorString ) );
1116                tr_torrentStop( t->tor );
1117            }
1118            break;
1119
1120        default:
1121            assert( 0 );
1122    }
1123
1124    torrentUnlock( t );
1125}
1126
1127static void
1128ensureAtomExists( Torrent          * t,
1129                  const tr_address * addr,
1130                  tr_port            port,
1131                  uint8_t            flags,
1132                  uint8_t            from )
1133{
1134    if( getExistingAtom( t, addr ) == NULL )
1135    {
1136        struct peer_atom * a;
1137        a = tr_new0( struct peer_atom, 1 );
1138        a->addr = *addr;
1139        a->port = port;
1140        a->flags = flags;
1141        a->from = from;
1142        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1143        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1144    }
1145}
1146
1147static int
1148getMaxPeerCount( const tr_torrent * tor )
1149{
1150    return tor->maxConnectedPeers;
1151}
1152
1153static int
1154getPeerCount( const Torrent * t )
1155{
1156    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1157}
1158
1159/* FIXME: this is kind of a mess. */
1160static tr_bool
1161myHandshakeDoneCB( tr_handshake  * handshake,
1162                   tr_peerIo     * io,
1163                   int             isConnected,
1164                   const uint8_t * peer_id,
1165                   void          * vmanager )
1166{
1167    tr_bool            ok = isConnected;
1168    tr_bool            success = FALSE;
1169    tr_port            port;
1170    const tr_address * addr;
1171    tr_peerMgr       * manager = vmanager;
1172    Torrent          * t;
1173    tr_handshake     * ours;
1174
1175    assert( io );
1176    assert( isConnected == 0 || isConnected == 1 );
1177
1178    t = tr_peerIoHasTorrentHash( io )
1179        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1180        : NULL;
1181
1182    if( tr_peerIoIsIncoming ( io ) )
1183        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1184                                        handshake, handshakeCompare );
1185    else if( t )
1186        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1187                                        handshake, handshakeCompare );
1188    else
1189        ours = handshake;
1190
1191    assert( ours );
1192    assert( ours == handshake );
1193
1194    if( t )
1195        torrentLock( t );
1196
1197    addr = tr_peerIoGetAddress( io, &port );
1198
1199    if( !ok || !t || !t->isRunning )
1200    {
1201        if( t )
1202        {
1203            struct peer_atom * atom = getExistingAtom( t, addr );
1204            if( atom )
1205                ++atom->numFails;
1206        }
1207    }
1208    else /* looking good */
1209    {
1210        struct peer_atom * atom;
1211        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1212        atom = getExistingAtom( t, addr );
1213        atom->time = time( NULL );
1214        atom->piece_data_time = 0;
1215
1216        if( atom->myflags & MYFLAG_BANNED )
1217        {
1218            tordbg( t, "banned peer %s tried to reconnect",
1219                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1220        }
1221        else if( tr_peerIoIsIncoming( io )
1222               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1223
1224        {
1225        }
1226        else
1227        {
1228            tr_peer * peer = getExistingPeer( t, addr );
1229
1230            if( peer ) /* we already have this peer */
1231            {
1232            }
1233            else
1234            {
1235                peer = getPeer( t, addr );
1236                tr_free( peer->client );
1237
1238                if( !peer_id )
1239                    peer->client = NULL;
1240                else {
1241                    char client[128];
1242                    tr_clientForId( client, sizeof( client ), peer_id );
1243                    peer->client = tr_strdup( client );
1244                }
1245
1246                peer->port = port;
1247                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1248                                                                balanced by our unref in peerDestructor()  */
1249                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1250                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1251
1252                success = TRUE;
1253            }
1254        }
1255    }
1256
1257    if( t )
1258        torrentUnlock( t );
1259
1260    return success;
1261}
1262
1263void
1264tr_peerMgrAddIncoming( tr_peerMgr * manager,
1265                       tr_address * addr,
1266                       tr_port      port,
1267                       int          socket )
1268{
1269    managerLock( manager );
1270
1271    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1272    {
1273        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1274        tr_netClose( socket );
1275    }
1276    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1277    {
1278        tr_netClose( socket );
1279    }
1280    else /* we don't have a connetion to them yet... */
1281    {
1282        tr_peerIo *    io;
1283        tr_handshake * handshake;
1284
1285        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1286
1287        handshake = tr_handshakeNew( io,
1288                                     manager->session->encryptionMode,
1289                                     myHandshakeDoneCB,
1290                                     manager );
1291
1292        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1293
1294        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1295                                 handshakeCompare );
1296    }
1297
1298    managerUnlock( manager );
1299}
1300
1301static tr_bool
1302tr_isPex( const tr_pex * pex )
1303{
1304    return pex && tr_isAddress( &pex->addr );
1305}
1306
1307void
1308tr_peerMgrAddPex( tr_torrent   *  tor,
1309                  uint8_t         from,
1310                  const tr_pex *  pex )
1311{
1312    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1313    {
1314        Torrent * t = tor->torrentPeers;
1315        managerLock( t->manager );
1316
1317        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1318            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1319                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1320
1321        managerUnlock( t->manager );
1322    }
1323}
1324
1325tr_pex *
1326tr_peerMgrCompactToPex( const void *    compact,
1327                        size_t          compactLen,
1328                        const uint8_t * added_f,
1329                        size_t          added_f_len,
1330                        size_t *        pexCount )
1331{
1332    size_t          i;
1333    size_t          n = compactLen / 6;
1334    const uint8_t * walk = compact;
1335    tr_pex *        pex = tr_new0( tr_pex, n );
1336
1337    for( i = 0; i < n; ++i )
1338    {
1339        pex[i].addr.type = TR_AF_INET;
1340        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1341        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1342        if( added_f && ( n == added_f_len ) )
1343            pex[i].flags = added_f[i];
1344    }
1345
1346    *pexCount = n;
1347    return pex;
1348}
1349
1350tr_pex *
1351tr_peerMgrCompact6ToPex( const void    * compact,
1352                         size_t          compactLen,
1353                         const uint8_t * added_f,
1354                         size_t          added_f_len,
1355                         size_t        * pexCount )
1356{
1357    size_t          i;
1358    size_t          n = compactLen / 18;
1359    const uint8_t * walk = compact;
1360    tr_pex *        pex = tr_new0( tr_pex, n );
1361   
1362    for( i = 0; i < n; ++i )
1363    {
1364        pex[i].addr.type = TR_AF_INET6;
1365        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1366        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1367        if( added_f && ( n == added_f_len ) )
1368            pex[i].flags = added_f[i];
1369    }
1370   
1371    *pexCount = n;
1372    return pex;
1373}
1374
1375tr_pex *
1376tr_peerMgrArrayToPex( const void * array,
1377                      size_t       arrayLen,
1378                      size_t      * pexCount )
1379{
1380    size_t          i;
1381    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1382    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1383    const uint8_t * walk = array;
1384    tr_pex        * pex = tr_new0( tr_pex, n );
1385   
1386    for( i = 0 ; i < n ; i++ ) {
1387        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1388        tr_suspectAddress( &pex[i].addr, "tracker"  );
1389        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1390        pex[i].flags = 0x00;
1391        walk += sizeof( tr_address ) + 2;
1392    }
1393   
1394    *pexCount = n;
1395    return pex;
1396}
1397
1398/**
1399***
1400**/
1401
1402void
1403tr_peerMgrSetBlame( tr_torrent     * tor,
1404                    tr_piece_index_t pieceIndex,
1405                    int              success )
1406{
1407    if( !success )
1408    {
1409        int        peerCount, i;
1410        Torrent *  t = tor->torrentPeers;
1411        tr_peer ** peers;
1412
1413        assert( torrentIsLocked( t ) );
1414
1415        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1416        for( i = 0; i < peerCount; ++i )
1417        {
1418            tr_peer * peer = peers[i];
1419            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1420            {
1421                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1422                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1423                        pieceIndex, (int)peer->strikes + 1 );
1424                addStrike( t, peer );
1425            }
1426        }
1427    }
1428}
1429
1430int
1431tr_pexCompare( const void * va, const void * vb )
1432{
1433    const tr_pex * a = va;
1434    const tr_pex * b = vb;
1435    int i;
1436
1437    assert( tr_isPex( a ) );
1438    assert( tr_isPex( b ) );
1439
1440    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1441        return i;
1442
1443    if( a->port != b->port )
1444        return a->port < b->port ? -1 : 1;
1445
1446    return 0;
1447}
1448
1449static int
1450peerPrefersCrypto( const tr_peer * peer )
1451{
1452    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1453        return TRUE;
1454
1455    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1456        return FALSE;
1457
1458    return tr_peerIoIsEncrypted( peer->io );
1459}
1460
1461int
1462tr_peerMgrGetPeers( tr_torrent      * tor,
1463                    tr_pex         ** setme_pex,
1464                    uint8_t           af)
1465{
1466    int peersReturning = 0;
1467    const Torrent * t = tor->torrentPeers;
1468
1469    managerLock( t->manager );
1470
1471    {
1472        int i;
1473        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1474        const int peerCount = tr_ptrArraySize( &t->peers );
1475        /* for now, this will waste memory on torrents that have both
1476         * ipv6 and ipv4 peers */
1477        tr_pex * pex = tr_new( tr_pex, peerCount );
1478        tr_pex * walk = pex;
1479
1480        for( i=0; i<peerCount; ++i )
1481        {
1482            const tr_peer * peer = peers[i];
1483            if( peer->addr.type == af )
1484            {
1485                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1486
1487                assert( tr_isAddress( &peer->addr ) );
1488                walk->addr = peer->addr;
1489                walk->port = peer->port;
1490                walk->flags = 0;
1491                if( peerPrefersCrypto( peer ) )
1492                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1493                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1494                    walk->flags |= ADDED_F_SEED_FLAG;
1495                ++peersReturning;
1496                ++walk;
1497            }
1498        }
1499
1500        assert( ( walk - pex ) == peersReturning );
1501        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1502        *setme_pex = pex;
1503    }
1504
1505    managerUnlock( t->manager );
1506    return peersReturning;
1507}
1508
1509static int reconnectPulse( void * vtorrent );
1510
1511static int rechokePulse( void * vtorrent );
1512
1513void
1514tr_peerMgrStartTorrent( tr_torrent * tor )
1515{
1516    Torrent * t = tor->torrentPeers;
1517
1518    managerLock( t->manager );
1519
1520    assert( t );
1521    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1522    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1523
1524    if( !t->isRunning )
1525    {
1526        t->isRunning = 1;
1527
1528        t->reconnectTimer = tr_timerNew( t->manager->session,
1529                                         reconnectPulse, t,
1530                                         RECONNECT_PERIOD_MSEC );
1531
1532        t->rechokeTimer = tr_timerNew( t->manager->session,
1533                                       rechokePulse, t,
1534                                       RECHOKE_PERIOD_MSEC );
1535
1536        reconnectPulse( t );
1537
1538        rechokePulse( t );
1539
1540        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1541            refillSoon( t );
1542    }
1543
1544    managerUnlock( t->manager );
1545}
1546
1547static void
1548stopTorrent( Torrent * t )
1549{
1550    assert( torrentIsLocked( t ) );
1551
1552    t->isRunning = 0;
1553    tr_timerFree( &t->rechokeTimer );
1554    tr_timerFree( &t->reconnectTimer );
1555
1556    /* disconnect the peers. */
1557    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1558    tr_ptrArrayClear( &t->peers );
1559
1560    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1561     * which removes the handshake from t->outgoingHandshakes... */
1562    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1563        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1564}
1565
1566void
1567tr_peerMgrStopTorrent( tr_torrent * tor )
1568{
1569    Torrent * t = tor->torrentPeers;
1570
1571    managerLock( t->manager );
1572
1573    stopTorrent( t );
1574
1575    managerUnlock( t->manager );
1576}
1577
1578void
1579tr_peerMgrAddTorrent( tr_peerMgr * manager,
1580                      tr_torrent * tor )
1581{
1582    managerLock( manager );
1583
1584    assert( tor );
1585    assert( tor->torrentPeers == NULL );
1586
1587    tor->torrentPeers = torrentConstructor( manager, tor );
1588
1589    managerUnlock( manager );
1590}
1591
1592void
1593tr_peerMgrRemoveTorrent( tr_torrent * tor )
1594{
1595    tr_torrentLock( tor );
1596
1597    stopTorrent( tor->torrentPeers );
1598    torrentDestructor( tor->torrentPeers );
1599
1600    tr_torrentUnlock( tor );
1601}
1602
1603void
1604tr_peerMgrTorrentAvailability( const tr_torrent * tor,
1605                               int8_t           * tab,
1606                               unsigned int       tabCount )
1607{
1608    tr_piece_index_t   i;
1609    const Torrent *    t;
1610    float              interval;
1611    tr_bool            isSeed;
1612    int                peerCount;
1613    const tr_peer **   peers;
1614    tr_torrentLock( tor );
1615
1616    t = tor->torrentPeers;
1617    tor = t->tor;
1618    interval = tor->info.pieceCount / (float)tabCount;
1619    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1620    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1621    peerCount = tr_ptrArraySize( &t->peers );
1622
1623    memset( tab, 0, tabCount );
1624
1625    for( i = 0; tor && i < tabCount; ++i )
1626    {
1627        const int piece = i * interval;
1628
1629        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1630            tab[i] = -1;
1631        else if( peerCount ) {
1632            int j;
1633            for( j = 0; j < peerCount; ++j )
1634                if( tr_bitfieldHas( peers[j]->have, i ) )
1635                    ++tab[i];
1636        }
1637    }
1638
1639    tr_torrentUnlock( tor );
1640}
1641
1642/* Returns the pieces that are available from peers */
1643tr_bitfield*
1644tr_peerMgrGetAvailable( const tr_torrent * tor )
1645{
1646    int i;
1647    int peerCount;
1648    Torrent * t = tor->torrentPeers;
1649    const tr_peer ** peers;
1650    tr_bitfield * pieces;
1651    managerLock( t->manager );
1652
1653    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1654    peerCount = tr_ptrArraySize( &t->peers );
1655    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1656    for( i=0; i<peerCount; ++i )
1657        tr_bitfieldOr( pieces, peers[i]->have );
1658
1659    managerUnlock( t->manager );
1660    return pieces;
1661}
1662
1663void
1664tr_peerMgrTorrentStats( tr_torrent       * tor,
1665                        int              * setmePeersKnown,
1666                        int              * setmePeersConnected,
1667                        int              * setmeSeedsConnected,
1668                        int              * setmeWebseedsSendingToUs,
1669                        int              * setmePeersSendingToUs,
1670                        int              * setmePeersGettingFromUs,
1671                        int              * setmePeersFrom )
1672{
1673    int i, size;
1674    const Torrent * t = tor->torrentPeers;
1675    const tr_peer ** peers;
1676    const tr_webseed ** webseeds;
1677
1678    managerLock( t->manager );
1679
1680    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1681    size = tr_ptrArraySize( &t->peers );
1682
1683    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1684    *setmePeersConnected       = 0;
1685    *setmeSeedsConnected       = 0;
1686    *setmePeersGettingFromUs   = 0;
1687    *setmePeersSendingToUs     = 0;
1688    *setmeWebseedsSendingToUs  = 0;
1689
1690    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1691        setmePeersFrom[i] = 0;
1692
1693    for( i=0; i<size; ++i )
1694    {
1695        const tr_peer * peer = peers[i];
1696        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1697
1698        if( peer->io == NULL ) /* not connected */
1699            continue;
1700
1701        ++*setmePeersConnected;
1702
1703        ++setmePeersFrom[atom->from];
1704
1705        if( clientIsDownloadingFrom( peer ) )
1706            ++*setmePeersSendingToUs;
1707
1708        if( clientIsUploadingTo( peer ) )
1709            ++*setmePeersGettingFromUs;
1710
1711        if( atom->flags & ADDED_F_SEED_FLAG )
1712            ++*setmeSeedsConnected;
1713    }
1714
1715    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1716    size = tr_ptrArraySize( &t->webseeds );
1717    for( i=0; i<size; ++i )
1718        if( tr_webseedIsActive( webseeds[i] ) )
1719            ++*setmeWebseedsSendingToUs;
1720
1721    managerUnlock( t->manager );
1722}
1723
1724float*
1725tr_peerMgrWebSpeeds( const tr_torrent * tor )
1726{
1727    const Torrent * t = tor->torrentPeers;
1728    const tr_webseed ** webseeds;
1729    int i;
1730    int webseedCount;
1731    float * ret;
1732    uint64_t now;
1733
1734    assert( t->manager );
1735    managerLock( t->manager );
1736
1737    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1738    webseedCount = tr_ptrArraySize( &t->webseeds );
1739    assert( webseedCount == tor->info.webseedCount );
1740    ret = tr_new0( float, webseedCount );
1741    now = tr_date( );
1742
1743    for( i=0; i<webseedCount; ++i )
1744        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1745            ret[i] = -1.0;
1746
1747    managerUnlock( t->manager );
1748    return ret;
1749}
1750
1751double
1752tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1753{
1754    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1755}
1756
1757
1758struct tr_peer_stat *
1759tr_peerMgrPeerStats( const tr_torrent    * tor,
1760                     int                 * setmeCount )
1761{
1762    int i, size;
1763    const Torrent * t = tor->torrentPeers;
1764    const tr_peer ** peers;
1765    tr_peer_stat * ret;
1766    uint64_t now;
1767
1768    assert( t->manager );
1769    managerLock( t->manager );
1770
1771    size = tr_ptrArraySize( &t->peers );
1772    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1773    ret = tr_new0( tr_peer_stat, size );
1774    now = tr_date( );
1775
1776    for( i = 0; i < size; ++i )
1777    {
1778        char *                   pch;
1779        const tr_peer *          peer = peers[i];
1780        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1781        tr_peer_stat *           stat = ret + i;
1782        tr_address               norm_addr;
1783
1784        norm_addr = peer->addr;
1785        tr_normalizeV4Mapped( &norm_addr );
1786        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1787        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1788                   sizeof( stat->client ) );
1789        stat->port               = ntohs( peer->port );
1790        stat->from               = atom->from;
1791        stat->progress           = peer->progress;
1792        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1793        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1794        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1795        stat->peerIsChoked       = peer->peerIsChoked;
1796        stat->peerIsInterested   = peer->peerIsInterested;
1797        stat->clientIsChoked     = peer->clientIsChoked;
1798        stat->clientIsInterested = peer->clientIsInterested;
1799        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1800        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1801        stat->isUploadingTo      = clientIsUploadingTo( peer );
1802        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1803
1804        pch = stat->flagStr;
1805        if( t->optimistic == peer ) *pch++ = 'O';
1806        if( stat->isDownloadingFrom ) *pch++ = 'D';
1807        else if( stat->clientIsInterested ) *pch++ = 'd';
1808        if( stat->isUploadingTo ) *pch++ = 'U';
1809        else if( stat->peerIsInterested ) *pch++ = 'u';
1810        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1811        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1812        if( stat->isEncrypted ) *pch++ = 'E';
1813        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1814        if( stat->isIncoming ) *pch++ = 'I';
1815        *pch = '\0';
1816    }
1817
1818    *setmeCount = size;
1819
1820    managerUnlock( t->manager );
1821    return ret;
1822}
1823
1824/**
1825***
1826**/
1827
1828struct ChokeData
1829{
1830    tr_bool         doUnchoke;
1831    tr_bool         isInterested;
1832    tr_bool         isChoked;
1833    int             rate;
1834    tr_peer *       peer;
1835};
1836
1837static int
1838compareChoke( const void * va,
1839              const void * vb )
1840{
1841    const struct ChokeData * a = va;
1842    const struct ChokeData * b = vb;
1843
1844    if( a->rate != b->rate ) /* prefer higher overall speeds */
1845        return a->rate > b->rate ? -1 : 1;
1846
1847    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1848        return a->isChoked ? 1 : -1;
1849
1850    return 0;
1851}
1852
1853static int
1854isNew( const tr_peer * peer )
1855{
1856    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1857}
1858
1859static int
1860isSame( const tr_peer * peer )
1861{
1862    return peer && peer->client && strstr( peer->client, "Transmission" );
1863}
1864
1865/**
1866***
1867**/
1868
1869static void
1870rechoke( Torrent * t )
1871{
1872    int i, size, unchokedInterested;
1873    const int peerCount = tr_ptrArraySize( &t->peers );
1874    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1875    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1876    const tr_session * session = t->manager->session;
1877    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1878    const uint64_t now = tr_date( );
1879
1880    assert( torrentIsLocked( t ) );
1881
1882    /* sort the peers by preference and rate */
1883    for( i = 0, size = 0; i < peerCount; ++i )
1884    {
1885        tr_peer * peer = peers[i];
1886        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1887
1888        if( peer->progress >= 1.0 ) /* choke all seeds */
1889        {
1890            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1891        }
1892        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1893        {
1894            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1895        }
1896        else if( chokeAll ) /* choke everyone if we're not uploading */
1897        {
1898            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1899        }
1900        else
1901        {
1902            struct ChokeData * n = &choke[size++];
1903            n->peer         = peer;
1904            n->isInterested = peer->peerIsInterested;
1905            n->isChoked     = peer->peerIsChoked;
1906            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1907        }
1908    }
1909
1910    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1911
1912    /**
1913     * Reciprocation and number of uploads capping is managed by unchoking
1914     * the N peers which have the best upload rate and are interested.
1915     * This maximizes the client's download rate. These N peers are
1916     * referred to as downloaders, because they are interested in downloading
1917     * from the client.
1918     *
1919     * Peers which have a better upload rate (as compared to the downloaders)
1920     * but aren't interested get unchoked. If they become interested, the
1921     * downloader with the worst upload rate gets choked. If a client has
1922     * a complete file, it uses its upload rate rather than its download
1923     * rate to decide which peers to unchoke.
1924     */
1925    unchokedInterested = 0;
1926    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
1927        choke[i].doUnchoke = 1;
1928        if( choke[i].isInterested )
1929            ++unchokedInterested;
1930    }
1931
1932    /* optimistic unchoke */
1933    if( i < size )
1934    {
1935        int n;
1936        struct ChokeData * c;
1937        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1938
1939        for( ; i<size; ++i )
1940        {
1941            if( choke[i].isInterested )
1942            {
1943                const tr_peer * peer = choke[i].peer;
1944                int x = 1, y;
1945                if( isNew( peer ) ) x *= 3;
1946                if( isSame( peer ) ) x *= 3;
1947                for( y=0; y<x; ++y )
1948                    tr_ptrArrayAppend( &randPool, &choke[i] );
1949            }
1950        }
1951
1952        if(( n = tr_ptrArraySize( &randPool )))
1953        {
1954            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
1955            c->doUnchoke = 1;
1956            t->optimistic = c->peer;
1957        }
1958
1959        tr_ptrArrayDestruct( &randPool, NULL );
1960    }
1961
1962    for( i=0; i<size; ++i )
1963        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1964
1965    /* cleanup */
1966    tr_free( choke );
1967}
1968
1969static int
1970rechokePulse( void * vtorrent )
1971{
1972    Torrent * t = vtorrent;
1973
1974    torrentLock( t );
1975    rechoke( t );
1976    torrentUnlock( t );
1977    return TRUE;
1978}
1979
1980/***
1981****
1982****  Life and Death
1983****
1984***/
1985
1986typedef enum
1987{
1988    TR_CAN_KEEP,
1989    TR_CAN_CLOSE,
1990    TR_MUST_CLOSE,
1991}
1992tr_close_type_t;
1993
1994static tr_close_type_t
1995shouldPeerBeClosed( const Torrent    * t,
1996                    const tr_peer    * peer,
1997                    int                peerCount )
1998{
1999    const tr_torrent *       tor = t->tor;
2000    const time_t             now = time( NULL );
2001    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2002
2003    /* if it's marked for purging, close it */
2004    if( peer->doPurge )
2005    {
2006        tordbg( t, "purging peer %s because its doPurge flag is set",
2007                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2008        return TR_MUST_CLOSE;
2009    }
2010
2011    /* if we're seeding and the peer has everything we have,
2012     * and enough time has passed for a pex exchange, then disconnect */
2013    if( tr_torrentIsSeed( tor ) )
2014    {
2015        int peerHasEverything;
2016        if( atom->flags & ADDED_F_SEED_FLAG )
2017            peerHasEverything = TRUE;
2018        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2019            peerHasEverything = FALSE;
2020        else {
2021            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2022            tr_bitfieldDifference( tmp, peer->have );
2023            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2024            tr_bitfieldFree( tmp );
2025        }
2026
2027        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2028        {
2029            tordbg( t, "purging peer %s because we're both seeds",
2030                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2031            return TR_MUST_CLOSE;
2032        }
2033    }
2034
2035    /* disconnect if it's been too long since piece data has been transferred.
2036     * this is on a sliding scale based on number of available peers... */
2037    {
2038        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2039        /* if we have >= relaxIfFewerThan, strictness is 100%.
2040         * if we have zero connections, strictness is 0% */
2041        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2042                               ? 1.0
2043                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2044        const int lo = MIN_UPLOAD_IDLE_SECS;
2045        const int hi = MAX_UPLOAD_IDLE_SECS;
2046        const int limit = hi - ( ( hi - lo ) * strictness );
2047        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2048/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2049        if( idleTime > limit ) {
2050            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2051                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2052            return TR_CAN_CLOSE;
2053        }
2054    }
2055
2056    return TR_CAN_KEEP;
2057}
2058
2059static tr_peer **
2060getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
2061{
2062    int i, peerCount, outsize;
2063    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2064    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2065
2066    assert( torrentIsLocked( t ) );
2067
2068    for( i = outsize = 0; i < peerCount; ++i )
2069        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
2070            ret[outsize++] = peers[i];
2071
2072    *setmeSize = outsize;
2073    return ret;
2074}
2075
2076static int
2077compareCandidates( const void * va,
2078                   const void * vb )
2079{
2080    const struct peer_atom * a = *(const struct peer_atom**) va;
2081    const struct peer_atom * b = *(const struct peer_atom**) vb;
2082
2083    /* <Charles> Here we would probably want to try reconnecting to
2084     * peers that had most recently given us data. Lots of users have
2085     * trouble with resets due to their routers and/or ISPs. This way we
2086     * can quickly recover from an unwanted reset. So we sort
2087     * piece_data_time in descending order.
2088     */
2089
2090    if( a->piece_data_time != b->piece_data_time )
2091        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2092
2093    if( a->numFails != b->numFails )
2094        return a->numFails < b->numFails ? -1 : 1;
2095
2096    if( a->time != b->time )
2097        return a->time < b->time ? -1 : 1;
2098
2099    /* all other things being equal, prefer peers whose
2100     * information comes from a more reliable source */
2101    if( a->from != b->from )
2102        return a->from < b->from ? -1 : 1;
2103
2104    return 0;
2105}
2106
2107static int
2108getReconnectIntervalSecs( const struct peer_atom * atom )
2109{
2110    int          sec;
2111    const time_t now = time( NULL );
2112
2113    /* if we were recently connected to this peer and transferring piece
2114     * data, try to reconnect to them sooner rather that later -- we don't
2115     * want network troubles to get in the way of a good peer. */
2116    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2117        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2118
2119    /* don't allow reconnects more often than our minimum */
2120    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2121        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2122
2123    /* otherwise, the interval depends on how many times we've tried
2124     * and failed to connect to the peer */
2125    else switch( atom->numFails ) {
2126        case 0: sec = 0; break;
2127        case 1: sec = 5; break;
2128        case 2: sec = 2 * 60; break;
2129        case 3: sec = 15 * 60; break;
2130        case 4: sec = 30 * 60; break;
2131        case 5: sec = 60 * 60; break;
2132        default: sec = 120 * 60; break;
2133    }
2134
2135    return sec;
2136}
2137
2138static struct peer_atom **
2139getPeerCandidates( Torrent * t, int * setmeSize )
2140{
2141    int                 i, atomCount, retCount;
2142    struct peer_atom ** atoms;
2143    struct peer_atom ** ret;
2144    const time_t        now = time( NULL );
2145    const int           seed = tr_torrentIsSeed( t->tor );
2146
2147    assert( torrentIsLocked( t ) );
2148
2149    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2150    ret = tr_new( struct peer_atom*, atomCount );
2151    for( i = retCount = 0; i < atomCount; ++i )
2152    {
2153        int                interval;
2154        struct peer_atom * atom = atoms[i];
2155
2156        /* peer fed us too much bad data ... we only keep it around
2157         * now to weed it out in case someone sends it to us via pex */
2158        if( atom->myflags & MYFLAG_BANNED )
2159            continue;
2160
2161        /* peer was unconnectable before, so we're not going to keep trying.
2162         * this is needs a separate flag from `banned', since if they try
2163         * to connect to us later, we'll let them in */
2164        if( atom->myflags & MYFLAG_UNREACHABLE )
2165            continue;
2166
2167        /* we don't need two connections to the same peer... */
2168        if( peerIsInUse( t, &atom->addr ) )
2169            continue;
2170
2171        /* no need to connect if we're both seeds... */
2172        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2173                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2174            continue;
2175
2176        /* don't reconnect too often */
2177        interval = getReconnectIntervalSecs( atom );
2178        if( ( now - atom->time ) < interval )
2179        {
2180            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2181                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2182            continue;
2183        }
2184
2185        /* Don't connect to peers in our blocklist */
2186        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2187            continue;
2188
2189        ret[retCount++] = atom;
2190    }
2191
2192    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2193    *setmeSize = retCount;
2194    return ret;
2195}
2196
2197static void
2198closePeer( Torrent * t, tr_peer * peer )
2199{
2200    struct peer_atom * atom;
2201
2202    assert( t != NULL );
2203    assert( peer != NULL );
2204
2205    /* if we transferred piece data, then they might be good peers,
2206       so reset their `numFails' weight to zero.  otherwise we connected
2207       to them fruitlessly, so mark it as another fail */
2208    atom = getExistingAtom( t, &peer->addr );
2209    if( atom->piece_data_time )
2210        atom->numFails = 0;
2211    else
2212        ++atom->numFails;
2213
2214    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2215    removePeer( t, peer );
2216}
2217
2218static int
2219reconnectPulse( void * vtorrent )
2220{
2221    Torrent *     t = vtorrent;
2222    static time_t prevTime = 0;
2223    static int    newConnectionsThisSecond = 0;
2224    time_t        now;
2225
2226    torrentLock( t );
2227
2228    now = time( NULL );
2229    if( prevTime != now )
2230    {
2231        prevTime = now;
2232        newConnectionsThisSecond = 0;
2233    }
2234
2235    if( !t->isRunning )
2236    {
2237        removeAllPeers( t );
2238    }
2239    else
2240    {
2241        int i;
2242        int canCloseCount;
2243        int mustCloseCount;
2244        int candidateCount;
2245        int maxCandidates;
2246        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2247        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2248        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2249
2250        tordbg( t, "reconnect pulse for [%s]: "
2251                   "%d must-close connections, "
2252                   "%d can-close connections, "
2253                   "%d connection candidates, "
2254                   "%d atoms, "
2255                   "max per pulse is %d",
2256                   t->tor->info.name,
2257                   mustCloseCount,
2258                   canCloseCount,
2259                   candidateCount,
2260                   tr_ptrArraySize( &t->pool ),
2261                   MAX_RECONNECTIONS_PER_PULSE );
2262
2263        /* disconnect the really bad peers */
2264        for( i=0; i<mustCloseCount; ++i )
2265            closePeer( t, mustClose[i] );
2266
2267        /* decide how many peers can we try to add in this pass */
2268        maxCandidates = candidateCount;
2269        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
2270        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2271        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2272
2273        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2274        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2275            closePeer( t, canClose[i] );
2276
2277        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2278                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2279                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2280                   candidateCount,
2281                   MAX_RECONNECTIONS_PER_PULSE,
2282                   getPeerCount( t ),
2283                   getMaxPeerCount( t->tor ),
2284                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2285
2286        /* add some new ones */
2287        for( i=0; i<maxCandidates; ++i )
2288        {
2289            tr_peerMgr        * mgr = t->manager;
2290            struct peer_atom  * atom = candidates[i];
2291            tr_peerIo         * io;
2292
2293            tordbg( t, "Starting an OUTGOING connection with %s",
2294                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2295
2296            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2297
2298            if( io == NULL )
2299            {
2300                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2301                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2302                atom->myflags |= MYFLAG_UNREACHABLE;
2303            }
2304            else
2305            {
2306                tr_handshake * handshake = tr_handshakeNew( io,
2307                                                            mgr->session->encryptionMode,
2308                                                            myHandshakeDoneCB,
2309                                                            mgr );
2310
2311                assert( tr_peerIoGetTorrentHash( io ) );
2312
2313                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2314
2315                ++newConnectionsThisSecond;
2316
2317                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2318                                         handshakeCompare );
2319            }
2320
2321            atom->time = time( NULL );
2322        }
2323
2324        /* cleanup */
2325        tr_free( candidates );
2326        tr_free( mustClose );
2327        tr_free( canClose );
2328    }
2329
2330    torrentUnlock( t );
2331    return TRUE;
2332}
2333
2334/****
2335*****
2336*****  BANDWIDTH ALLOCATION
2337*****
2338****/
2339
2340static void
2341pumpAllPeers( tr_peerMgr * mgr )
2342{
2343    tr_torrent * tor = NULL;
2344
2345    while(( tor = tr_torrentNext( mgr->session, tor )))
2346    {
2347        int j;
2348        Torrent * t = tor->torrentPeers;
2349
2350        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2351        {
2352            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2353            tr_peerMsgsPulse( peer->msgs );
2354        }
2355    }
2356}
2357
2358static int
2359bandwidthPulse( void * vmgr )
2360{
2361    tr_peerMgr * mgr = vmgr;
2362    managerLock( mgr );
2363
2364    /* FIXME: this next line probably isn't necessary... */
2365    pumpAllPeers( mgr );
2366
2367    /* allocate bandwidth to the peers */
2368    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2369    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2370
2371    managerUnlock( mgr );
2372    return TRUE;
2373}
Note: See TracBrowser for help on using the repository browser.