source: trunk/libtransmission/peer-mgr.c @ 7620

Last change on this file since 7620 was 7620, checked in by charles, 12 years ago

(trunk libT) yet another step in the debugging cycle, crash report from ZogG and Rolcol

  • Property svn:keywords set to Date Rev Author Id
File size: 68.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 7620 2009-01-05 07:57:10Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-mgr-private.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* minimum interval for refilling peers' request lists */
49    REFILL_PERIOD_MSEC = 333,
50
51    /* when many peers are available, keep idle ones this long */
52    MIN_UPLOAD_IDLE_SECS = ( 30 ),
53
54    /* when few peers are available, keep idle ones this long */
55    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
56
57    /* how frequently to decide which peers live and die */
58    RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
59   
60    /* how frequently to reallocate bandwidth */
61    BANDWIDTH_PERIOD_MSEC = 500,
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 16,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 32,
69
70    /* number of unchoked peers per torrent.
71     * FIXME: this probably ought to be configurable */
72    MAX_UNCHOKED_PEERS = 14,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 5,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5
86};
87
88
89/**
90***
91**/
92
93enum
94{
95    UPLOAD_ONLY_UKNOWN,
96    UPLOAD_ONLY_YES,
97    UPLOAD_ONLY_NO
98};
99
100/**
101 * Peer information that should be kept even before we've connected and
102 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
103 * which ones would make good candidates for connecting to, and to watch out
104 * for banned peers.
105 *
106 * @see tr_peer
107 * @see tr_peermsgs
108 */
109struct peer_atom
110{
111    uint8_t     from;
112    uint8_t     flags;       /* these match the added_f flags */
113    uint8_t     myflags;     /* flags that aren't defined in added_f */
114    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
115    tr_port     port;
116    uint16_t    numFails;
117    tr_address  addr;
118    time_t      time;        /* when the peer's connection status last changed */
119    time_t      piece_data_time;
120};
121
122typedef struct
123{
124    tr_bool         isRunning;
125
126    uint8_t         hash[SHA_DIGEST_LENGTH];
127    int         *   pendingRequestCount;
128    tr_ptrArray     outgoingHandshakes; /* tr_handshake */
129    tr_ptrArray     pool; /* struct peer_atom */
130    tr_ptrArray     peers; /* tr_peer */
131    tr_ptrArray     webseeds; /* tr_webseed */
132    tr_timer *      reconnectTimer;
133    tr_timer *      rechokeTimer;
134    tr_timer *      refillTimer;
135    tr_torrent *    tor;
136    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
137
138    struct tr_peerMgr * manager;
139}
140Torrent;
141
142struct tr_peerMgr
143{
144    tr_session      * session;
145    tr_ptrArray       torrents; /* Torrent */
146    tr_ptrArray       incomingHandshakes; /* tr_handshake */
147    tr_timer        * bandwidthTimer;
148};
149
150#define tordbg( t, ... ) \
151    do { \
152        if( tr_deepLoggingIsActive( ) ) \
153            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
154    } while( 0 )
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
160    } while( 0 )
161
162/**
163***
164**/
165
166static inline void
167managerLock( const struct tr_peerMgr * manager )
168{
169    tr_globalLock( manager->session );
170}
171
172static inline void
173managerUnlock( const struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->session );
176}
177
178static inline void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183
184static inline void
185torrentUnlock( Torrent * torrent )
186{
187    managerUnlock( torrent->manager );
188}
189
190static inline int
191torrentIsLocked( const Torrent * t )
192{
193    return tr_globalIsLocked( t->manager->session );
194}
195
196/**
197***
198**/
199
200static int
201handshakeCompareToAddr( const void * va, const void * vb )
202{
203    const tr_handshake * a = va;
204
205    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
206}
207
208static int
209handshakeCompare( const void * a, const void * b )
210{
211    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
212}
213
214static tr_handshake*
215getExistingHandshake( tr_ptrArray      * handshakes,
216                      const tr_address * addr )
217{
218    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
219}
220
221static int
222comparePeerAtomToAddress( const void * va, const void * vb )
223{
224    const struct peer_atom * a = va;
225
226    return tr_compareAddresses( &a->addr, vb );
227}
228
229static int
230comparePeerAtoms( const void * va, const void * vb )
231{
232    const struct peer_atom * b = vb;
233
234    return comparePeerAtomToAddress( va, &b->addr );
235}
236
237/**
238***
239**/
240
241static int
242torrentCompare( const void * va,
243                const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va,
253                      const void * vb )
254{
255    const Torrent * a = va;
256    const uint8_t * b_hash = vb;
257
258    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
259}
260
261static Torrent*
262getExistingTorrent( tr_peerMgr *    manager,
263                    const uint8_t * hash )
264{
265    return (Torrent*) tr_ptrArrayFindSorted( &manager->torrents,
266                                             hash,
267                                             torrentCompareToHash );
268}
269
270static int
271peerCompare( const void * va, const void * vb )
272{
273    const tr_peer * a = va;
274    const tr_peer * b = vb;
275
276    return tr_compareAddresses( &a->addr, &b->addr );
277}
278
279static int
280peerCompareToAddr( const void * va, const void * vb )
281{
282    const tr_peer * a = va;
283
284    return tr_compareAddresses( &a->addr, vb );
285}
286
287static tr_peer*
288getExistingPeer( Torrent          * torrent,
289                 const tr_address * addr )
290{
291    assert( torrentIsLocked( torrent ) );
292    assert( addr );
293
294    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
295}
296
297static struct peer_atom*
298getExistingAtom( const Torrent    * t,
299                 const tr_address * addr )
300{
301    Torrent * tt = (Torrent*)t;
302    assert( torrentIsLocked( t ) );
303    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
304}
305
306static tr_bool
307peerIsInUse( const Torrent    * ct,
308             const tr_address * addr )
309{
310    Torrent * t = (Torrent*) ct;
311
312    assert( torrentIsLocked ( t ) );
313
314    return getExistingPeer( t, addr )
315        || getExistingHandshake( &t->outgoingHandshakes, addr )
316        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
317}
318
319static tr_peer*
320peerConstructor( const tr_address * addr )
321{
322    tr_peer * p;
323    p = tr_new0( tr_peer, 1 );
324    p->addr = *addr;
325    return p;
326}
327
328static tr_peer*
329getPeer( Torrent          * torrent,
330         const tr_address * addr )
331{
332    tr_peer * peer;
333
334    assert( torrentIsLocked( torrent ) );
335
336    peer = getExistingPeer( torrent, addr );
337
338    if( peer == NULL )
339    {
340        peer = peerConstructor( addr );
341        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
342    }
343
344    return peer;
345}
346
347static void
348peerDestructor( tr_peer * peer )
349{
350    assert( peer );
351
352    if( peer->msgs != NULL )
353    {
354        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
355        tr_peerMsgsFree( peer->msgs );
356    }
357
358    tr_peerIoSetIOFuncs( peer->io, NULL, NULL, NULL, NULL );
359    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
360
361    tr_bitfieldFree( peer->have );
362    tr_bitfieldFree( peer->blame );
363    tr_free( peer->client );
364
365    tr_free( peer );
366}
367
368static void
369removePeer( Torrent * t,
370            tr_peer * peer )
371{
372    tr_peer *          removed;
373    struct peer_atom * atom;
374
375    assert( torrentIsLocked( t ) );
376
377    atom = getExistingAtom( t, &peer->addr );
378    assert( atom );
379    atom->time = time( NULL );
380
381    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
382    assert( removed == peer );
383    peerDestructor( removed );
384}
385
386static void
387removeAllPeers( Torrent * t )
388{
389    while( !tr_ptrArrayEmpty( &t->peers ) )
390        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
391}
392
393static void
394torrentDestructor( void * vt )
395{
396    Torrent * t = vt;
397    uint8_t   hash[SHA_DIGEST_LENGTH];
398
399    assert( t );
400    assert( !t->isRunning );
401    assert( torrentIsLocked( t ) );
402    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
403    assert( tr_ptrArrayEmpty( &t->peers ) );
404
405    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
406
407    tr_timerFree( &t->reconnectTimer );
408    tr_timerFree( &t->rechokeTimer );
409    tr_timerFree( &t->refillTimer );
410
411    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
412    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
413    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
414    tr_ptrArrayDestruct( &t->peers, NULL );
415
416    tr_free( t->pendingRequestCount );
417    tr_free( t );
418}
419
420static void peerCallbackFunc( void * vpeer,
421                              void * vevent,
422                              void * vt );
423
424static Torrent*
425torrentConstructor( tr_peerMgr * manager,
426                    tr_torrent * tor )
427{
428    int       i;
429    Torrent * t;
430
431    t = tr_new0( Torrent, 1 );
432    t->manager = manager;
433    t->tor = tor;
434    t->pool = TR_PTR_ARRAY_INIT;
435    t->peers = TR_PTR_ARRAY_INIT;
436    t->webseeds = TR_PTR_ARRAY_INIT;
437    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
438    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
439
440    for( i = 0; i < tor->info.webseedCount; ++i )
441    {
442        tr_webseed * w =
443            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
444        tr_ptrArrayAppend( &t->webseeds, w );
445    }
446
447    return t;
448}
449
450
451static int bandwidthPulse( void * vmgr );
452
453
454tr_peerMgr*
455tr_peerMgrNew( tr_session * session )
456{
457    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
458
459    m->session = session;
460    m->torrents = TR_PTR_ARRAY_INIT;
461    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
462    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
463    return m;
464}
465
466void
467tr_peerMgrFree( tr_peerMgr * manager )
468{
469    managerLock( manager );
470
471    tr_timerFree( &manager->bandwidthTimer );
472
473    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
474     * the item from manager->handshakes, so this is a little roundabout... */
475    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
476        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
477
478    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
479
480    /* free the torrents. */
481    tr_ptrArrayDestruct( &manager->torrents, torrentDestructor );
482
483    managerUnlock( manager );
484    tr_free( manager );
485}
486
487static int
488clientIsDownloadingFrom( const tr_peer * peer )
489{
490    return peer->clientIsInterested && !peer->clientIsChoked;
491}
492
493static int
494clientIsUploadingTo( const tr_peer * peer )
495{
496    return peer->peerIsInterested && !peer->peerIsChoked;
497}
498
499/***
500****
501***/
502
503tr_bool
504tr_peerMgrPeerIsSeed( const tr_peerMgr  * mgr,
505                      const uint8_t     * torrentHash,
506                      const tr_address  * addr )
507{
508    tr_bool isSeed = FALSE;
509    const Torrent * t = NULL;
510    const struct peer_atom * atom = NULL;
511
512    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
513    if( t )
514        atom = getExistingAtom( t, addr );
515    if( atom )
516        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
517
518    return isSeed;
519}
520
521/****
522*****
523*****  REFILL
524*****
525****/
526
527static void
528assertValidPiece( Torrent * t, tr_piece_index_t piece )
529{
530    assert( t );
531    assert( t->tor );
532    assert( piece < t->tor->info.pieceCount );
533}
534
535static int
536getPieceRequests( Torrent * t, tr_piece_index_t piece )
537{
538    assertValidPiece( t, piece );
539
540    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
541}
542
543static void
544incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
545{
546    assertValidPiece( t, piece );
547
548    if( t->pendingRequestCount == NULL )
549        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
550    t->pendingRequestCount[piece]++;
551}
552
553static void
554decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
555{
556    assertValidPiece( t, piece );
557
558    if( t->pendingRequestCount )
559        t->pendingRequestCount[piece]--;
560}
561
562struct tr_refill_piece
563{
564    tr_priority_t    priority;
565    uint32_t         piece;
566    uint32_t         peerCount;
567    int              random;
568    int              pendingRequestCount;
569    int              missingBlockCount;
570};
571
572static int
573compareRefillPiece( const void * aIn, const void * bIn )
574{
575    const struct tr_refill_piece * a = aIn;
576    const struct tr_refill_piece * b = bIn;
577
578    /* if one piece has a higher priority, it goes first */
579    if( a->priority != b->priority )
580        return a->priority > b->priority ? -1 : 1;
581
582    /* have a per-priority endgame */
583    if( a->pendingRequestCount != b->pendingRequestCount )
584        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
585
586    /* fewer missing pieces goes first */
587    if( a->missingBlockCount != b->missingBlockCount )
588        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
589
590    /* otherwise if one has fewer peers, it goes first */
591    if( a->peerCount != b->peerCount )
592        return a->peerCount < b->peerCount ? -1 : 1;
593
594    /* otherwise go with our random seed */
595    if( a->random != b->random )
596        return a->random < b->random ? -1 : 1;
597
598    return 0;
599}
600
601static tr_piece_index_t *
602getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
603{
604    const tr_torrent  * tor = t->tor;
605    const tr_info     * inf = &tor->info;
606    tr_piece_index_t    i;
607    tr_piece_index_t    poolSize = 0;
608    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
609    int                 peerCount;
610    const tr_peer    ** peers;
611
612    assert( torrentIsLocked( t ) );
613
614    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
615    peerCount = tr_ptrArraySize( &t->peers );
616
617    /* make a list of the pieces that we want but don't have */
618    for( i = 0; i < inf->pieceCount; ++i )
619        if( !tor->info.pieces[i].dnd
620                && !tr_cpPieceIsComplete( &tor->completion, i ) )
621            pool[poolSize++] = i;
622
623    /* sort the pool by which to request next */
624    if( poolSize > 1 )
625    {
626        tr_piece_index_t j;
627        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
628
629        for( j = 0; j < poolSize; ++j )
630        {
631            int k;
632            const tr_piece_index_t piece = pool[j];
633            struct tr_refill_piece * setme = p + j;
634
635            setme->piece = piece;
636            setme->priority = inf->pieces[piece].priority;
637            setme->peerCount = 0;
638            setme->random = tr_cryptoWeakRandInt( INT_MAX );
639            setme->pendingRequestCount = getPieceRequests( t, piece );
640            setme->missingBlockCount
641                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
642
643            for( k = 0; k < peerCount; ++k )
644            {
645                const tr_peer * peer = peers[k];
646                if( peer->peerIsInterested
647                        && !peer->clientIsChoked
648                        && tr_bitfieldHas( peer->have, piece ) )
649                    ++setme->peerCount;
650            }
651        }
652
653        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
654               compareRefillPiece );
655
656        for( j = 0; j < poolSize; ++j )
657            pool[j] = p[j].piece;
658
659        tr_free( p );
660    }
661
662    *pieceCount = poolSize;
663    return pool;
664}
665
666struct tr_blockIterator
667{
668    Torrent * t;
669    tr_block_index_t blockIndex, blockCount, *blocks;
670    tr_piece_index_t pieceIndex, pieceCount, *pieces;
671};
672
673static struct tr_blockIterator*
674blockIteratorNew( Torrent * t )
675{
676    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
677    i->t = t;
678    i->pieces = getPreferredPieces( t, &i->pieceCount );
679    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
680    return i;
681}
682
683static int
684blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
685{
686    int found;
687    Torrent * t = i->t;
688    tr_torrent * tor = t->tor;
689
690    while( ( i->blockIndex == i->blockCount )
691        && ( i->pieceIndex < i->pieceCount ) )
692    {
693        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
694        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
695        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
696        tr_block_index_t block;
697
698        assert( index < tor->info.pieceCount );
699
700        i->blockCount = 0;
701        i->blockIndex = 0;
702        for( block=b; block!=e; ++block )
703            if( !tr_cpBlockIsComplete( &tor->completion, block ) )
704                i->blocks[i->blockCount++] = block;
705    }
706
707    assert( i->blockCount <= tor->blockCountInPiece );
708
709    if(( found = ( i->blockIndex < i->blockCount )))
710        *setme = i->blocks[i->blockIndex++];
711
712    return found;
713}
714
715static void
716blockIteratorFree( struct tr_blockIterator * i )
717{
718    tr_free( i->blocks );
719    tr_free( i->pieces );
720    tr_free( i );
721}
722
723static tr_peer**
724getPeersUploadingToClient( Torrent * t,
725                           int *     setmeCount )
726{
727    int j;
728    int peerCount = 0;
729    int retCount = 0;
730    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
731    tr_peer ** ret = tr_new( tr_peer *, peerCount );
732
733    j = 0; /* this is a temporary test to make sure we walk through all the peers */
734    if( peerCount )
735    {
736        /* Get a list of peers we're downloading from.
737           Pick a different starting point each time so all peers
738           get a chance at being the first in line */
739        const int fencepost = tr_cryptoWeakRandInt( peerCount );
740        int i = fencepost;
741        do {
742            if( clientIsDownloadingFrom( peers[i] ) )
743                ret[retCount++] = peers[i];
744            i = ( i + 1 ) % peerCount;
745            ++j;
746        } while( i != fencepost );
747    }
748    assert( j == peerCount );
749    *setmeCount = retCount;
750    return ret;
751}
752
753static uint32_t
754getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
755{
756    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
757    const uint64_t blockPos = tor->blockSize * b;
758    assert( blockPos >= piecePos );
759    return (uint32_t)( blockPos - piecePos );
760}
761
762static int
763refillPulse( void * vtorrent )
764{
765    tr_block_index_t block;
766    int peerCount;
767    int webseedCount;
768    tr_peer ** peers;
769    tr_webseed ** webseeds;
770    struct tr_blockIterator * blockIterator;
771    Torrent * t = vtorrent;
772    tr_torrent * tor = t->tor;
773
774    if( !t->isRunning )
775        return TRUE;
776    if( tr_torrentIsSeed( t->tor ) )
777        return TRUE;
778
779    torrentLock( t );
780    tordbg( t, "Refilling Request Buffers..." );
781
782    blockIterator = blockIteratorNew( t );
783    peers = getPeersUploadingToClient( t, &peerCount );
784    webseedCount = tr_ptrArraySize( &t->webseeds );
785    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
786                          webseedCount * sizeof( tr_webseed* ) );
787
788    while( ( webseedCount || peerCount )
789        && blockIteratorNext( blockIterator, &block ) )
790    {
791        int j;
792        int handled = FALSE;
793
794        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
795        const uint32_t offset = getBlockOffsetInPiece( tor, block );
796        const uint32_t length = tr_torBlockCountBytes( tor, block );
797
798        /* find a peer who can ask for this block */
799        for( j=0; !handled && j<peerCount; )
800        {
801            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
802            switch( val )
803            {
804                case TR_ADDREQ_FULL:
805                case TR_ADDREQ_CLIENT_CHOKED:
806                    peers[j] = peers[--peerCount];
807                    break;
808
809                case TR_ADDREQ_MISSING:
810                case TR_ADDREQ_DUPLICATE:
811                    ++j;
812                    break;
813
814                case TR_ADDREQ_OK:
815                    incrementPieceRequests( t, index );
816                    handled = TRUE;
817                    break;
818
819                default:
820                    assert( 0 && "unhandled value" );
821                    break;
822            }
823        }
824
825        /* maybe one of the webseeds can do it */
826        for( j=0; !handled && j<webseedCount; )
827        {
828            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
829            switch( val )
830            {
831                case TR_ADDREQ_FULL:
832                    webseeds[j] = webseeds[--webseedCount];
833                    break;
834
835                case TR_ADDREQ_OK:
836                    incrementPieceRequests( t, index );
837                    handled = TRUE;
838                    break;
839
840                default:
841                    assert( 0 && "unhandled value" );
842                    break;
843            }
844        }
845    }
846
847    /* cleanup */
848    blockIteratorFree( blockIterator );
849    tr_free( webseeds );
850    tr_free( peers );
851
852    t->refillTimer = NULL;
853    torrentUnlock( t );
854    return FALSE;
855}
856
857static void
858broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
859{
860    size_t i;
861    size_t peerCount;
862    tr_peer ** peers;
863
864    assert( torrentIsLocked( t ) );
865
866    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
867
868    peerCount = tr_ptrArraySize( &t->peers );
869    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
870    for( i=0; i<peerCount; ++i )
871        if( peers[i]->msgs )
872            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
873}
874
875static void
876addStrike( Torrent * t,
877           tr_peer * peer )
878{
879    tordbg( t, "increasing peer %s strike count to %d",
880            tr_peerIoAddrStr( &peer->addr,
881                              peer->port ), peer->strikes + 1 );
882
883    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
884    {
885        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
886        atom->myflags |= MYFLAG_BANNED;
887        peer->doPurge = 1;
888        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
889    }
890}
891
892static void
893gotBadPiece( Torrent *        t,
894             tr_piece_index_t pieceIndex )
895{
896    tr_torrent *   tor = t->tor;
897    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
898
899    tor->corruptCur += byteCount;
900    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
901}
902
903static void
904refillSoon( Torrent * t )
905{
906    if( t->refillTimer == NULL )
907        t->refillTimer = tr_timerNew( t->manager->session,
908                                      refillPulse, t,
909                                      REFILL_PERIOD_MSEC );
910}
911
912static void
913peerSuggestedPiece( Torrent            * t UNUSED,
914                    tr_peer            * peer UNUSED,
915                    tr_piece_index_t     pieceIndex UNUSED,
916                    int                  isFastAllowed UNUSED )
917{
918#if 0
919    assert( t );
920    assert( peer );
921    assert( peer->msgs );
922
923    /* is this a valid piece? */
924    if(  pieceIndex >= t->tor->info.pieceCount )
925        return;
926
927    /* don't ask for it if we've already got it */
928    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
929        return;
930
931    /* don't ask for it if they don't have it */
932    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
933        return;
934
935    /* don't ask for it if we're choked and it's not fast */
936    if( !isFastAllowed && peer->clientIsChoked )
937        return;
938
939    /* request the blocks that we don't have in this piece */
940    {
941        tr_block_index_t block;
942        const tr_torrent * tor = t->tor;
943        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
944        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
945
946        for( block=start; block<end; ++block )
947        {
948            if( !tr_cpBlockIsComplete( tor->completion, block ) )
949            {
950                const uint32_t offset = getBlockOffsetInPiece( tor, block );
951                const uint32_t length = tr_torBlockCountBytes( tor, block );
952                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
953                incrementPieceRequests( t, pieceIndex );
954            }
955        }
956    }
957#endif
958}
959
960static void
961peerCallbackFunc( void * vpeer, void * vevent, void * vt )
962{
963    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
964    Torrent * t = vt;
965    const tr_peer_event * e = vevent;
966
967    torrentLock( t );
968
969    switch( e->eventType )
970    {
971        case TR_PEER_UPLOAD_ONLY:
972            /* update our atom */
973            if( peer ) {
974                struct peer_atom * a = getExistingAtom( t, &peer->addr );
975                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
976            }
977            break;
978
979        case TR_PEER_NEED_REQ:
980            refillSoon( t );
981            break;
982
983        case TR_PEER_CANCEL:
984            decrementPieceRequests( t, e->pieceIndex );
985            break;
986
987        case TR_PEER_PEER_GOT_DATA:
988        {
989            const time_t now = time( NULL );
990            tr_torrent * tor = t->tor;
991
992            tor->activityDate = now;
993
994            if( e->wasPieceData )
995                tor->uploadedCur += e->length;
996
997            /* update the stats */
998            if( e->wasPieceData )
999                tr_statsAddUploaded( tor->session, e->length );
1000
1001            /* update our atom */
1002            if( peer ) {
1003                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1004                if( e->wasPieceData )
1005                    a->piece_data_time = now;
1006            }
1007
1008            break;
1009        }
1010
1011        case TR_PEER_CLIENT_GOT_SUGGEST:
1012            if( peer )
1013                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1014            break;
1015
1016        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1017            if( peer )
1018                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1019            break;
1020
1021        case TR_PEER_CLIENT_GOT_DATA:
1022        {
1023            const time_t now = time( NULL );
1024            tr_torrent * tor = t->tor;
1025            tor->activityDate = now;
1026
1027            /* only add this to downloadedCur if we got it from a peer --
1028             * webseeds shouldn't count against our ratio.  As one tracker
1029             * admin put it, "Those pieces are downloaded directly from the
1030             * content distributor, not the peers, it is the tracker's job
1031             * to manage the swarms, not the web server and does not fit
1032             * into the jurisdiction of the tracker." */
1033            if( peer && e->wasPieceData )
1034                tor->downloadedCur += e->length;
1035
1036            /* update the stats */ 
1037            if( e->wasPieceData )
1038                tr_statsAddDownloaded( tor->session, e->length );
1039
1040            /* update our atom */
1041            if( peer ) {
1042                struct peer_atom * a = getExistingAtom( t, &peer->addr );
1043                if( e->wasPieceData )
1044                    a->piece_data_time = now;
1045            }
1046
1047            break;
1048        }
1049
1050        case TR_PEER_PEER_PROGRESS:
1051        {
1052            if( peer )
1053            {
1054                struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1055                const int peerIsSeed = e->progress >= 1.0;
1056                if( peerIsSeed ) {
1057                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1058                    atom->flags |= ADDED_F_SEED_FLAG;
1059                } else {
1060                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
1061                    atom->flags &= ~ADDED_F_SEED_FLAG;
1062                }
1063            }
1064            break;
1065        }
1066
1067        case TR_PEER_CLIENT_GOT_BLOCK:
1068        {
1069            tr_torrent *     tor = t->tor;
1070
1071            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1072
1073            tr_cpBlockAdd( &tor->completion, block );
1074            decrementPieceRequests( t, e->pieceIndex );
1075
1076            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1077
1078            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1079            {
1080                const tr_piece_index_t p = e->pieceIndex;
1081                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1082
1083                if( !ok )
1084                {
1085                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1086                               (unsigned long)p );
1087                }
1088
1089                tr_torrentSetHasPiece( tor, p, ok );
1090                tr_torrentSetPieceChecked( tor, p, TRUE );
1091                tr_peerMgrSetBlame( tor->session->peerMgr, tor->info.hash, p, ok );
1092
1093                if( !ok )
1094                    gotBadPiece( t, p );
1095                else
1096                {
1097                    int i;
1098                    int peerCount = tr_ptrArraySize( &t->peers );
1099                    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1100                    for( i=0; i<peerCount; ++i )
1101                        tr_peerMsgsHave( peers[i]->msgs, p );
1102                }
1103
1104                tr_torrentRecheckCompleteness( tor );
1105            }
1106            break;
1107        }
1108
1109        case TR_PEER_ERROR:
1110            if( e->err == EINVAL )
1111            {
1112                addStrike( t, peer );
1113                peer->doPurge = 1;
1114                tordbg( t, "setting %s doPurge flag because we got an EINVAL error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1115            }
1116            else if( ( e->err == ERANGE )
1117                  || ( e->err == EMSGSIZE )
1118                  || ( e->err == ENOTCONN ) )
1119            {
1120                /* some protocol error from the peer */
1121                peer->doPurge = 1;
1122                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_peerIoAddrStr( &peer->addr, peer->port ) );
1123            }
1124            else /* a local error, such as an IO error */
1125            {
1126                t->tor->error = e->err;
1127                tr_strlcpy( t->tor->errorString,
1128                            tr_strerror( t->tor->error ),
1129                            sizeof( t->tor->errorString ) );
1130                tr_torrentStop( t->tor );
1131            }
1132            break;
1133
1134        default:
1135            assert( 0 );
1136    }
1137
1138    torrentUnlock( t );
1139}
1140
1141static void
1142ensureAtomExists( Torrent          * t,
1143                  const tr_address * addr,
1144                  tr_port            port,
1145                  uint8_t            flags,
1146                  uint8_t            from )
1147{
1148    if( getExistingAtom( t, addr ) == NULL )
1149    {
1150        struct peer_atom * a;
1151        a = tr_new0( struct peer_atom, 1 );
1152        a->addr = *addr;
1153        a->port = port;
1154        a->flags = flags;
1155        a->from = from;
1156        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1157        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
1158    }
1159}
1160
1161static int
1162getMaxPeerCount( const tr_torrent * tor )
1163{
1164    return tor->maxConnectedPeers;
1165}
1166
1167static int
1168getPeerCount( const Torrent * t )
1169{
1170    return tr_ptrArraySize( &t->peers );// + tr_ptrArraySize( &t->outgoingHandshakes );
1171}
1172
1173/* FIXME: this is kind of a mess. */
1174static tr_bool
1175myHandshakeDoneCB( tr_handshake  * handshake,
1176                   tr_peerIo     * io,
1177                   int             isConnected,
1178                   const uint8_t * peer_id,
1179                   void          * vmanager )
1180{
1181    tr_bool            ok = isConnected;
1182    tr_bool            success = FALSE;
1183    tr_port            port;
1184    const tr_address * addr;
1185    tr_peerMgr       * manager = vmanager;
1186    Torrent          * t;
1187    tr_handshake     * ours;
1188
1189    assert( io );
1190    assert( isConnected == 0 || isConnected == 1 );
1191
1192    t = tr_peerIoHasTorrentHash( io )
1193        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1194        : NULL;
1195
1196    if( tr_peerIoIsIncoming ( io ) )
1197        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1198                                        handshake, handshakeCompare );
1199    else if( t )
1200        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1201                                        handshake, handshakeCompare );
1202    else
1203        ours = handshake;
1204
1205    assert( ours );
1206    assert( ours == handshake );
1207
1208    if( t )
1209        torrentLock( t );
1210
1211    addr = tr_peerIoGetAddress( io, &port );
1212
1213    if( !ok || !t || !t->isRunning )
1214    {
1215        if( t )
1216        {
1217            struct peer_atom * atom = getExistingAtom( t, addr );
1218            if( atom )
1219                ++atom->numFails;
1220        }
1221    }
1222    else /* looking good */
1223    {
1224        struct peer_atom * atom;
1225        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1226        atom = getExistingAtom( t, addr );
1227        atom->time = time( NULL );
1228        atom->piece_data_time = 0;
1229
1230        if( atom->myflags & MYFLAG_BANNED )
1231        {
1232            tordbg( t, "banned peer %s tried to reconnect",
1233                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
1234        }
1235        else if( tr_peerIoIsIncoming( io )
1236               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1237
1238        {
1239        }
1240        else
1241        {
1242            tr_peer * peer = getExistingPeer( t, addr );
1243
1244            if( peer ) /* we already have this peer */
1245            {
1246            }
1247            else
1248            {
1249                peer = getPeer( t, addr );
1250                tr_free( peer->client );
1251
1252                if( !peer_id )
1253                    peer->client = NULL;
1254                else {
1255                    char client[128];
1256                    tr_clientForId( client, sizeof( client ), peer_id );
1257                    peer->client = tr_strdup( client );
1258                }
1259
1260                peer->port = port;
1261                peer->io = tr_handshakeStealIO( handshake );
1262                tr_peerIoRef( peer->io ); /* balanced by the unref in peerDestructor() */
1263                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1264                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1265
1266                success = TRUE;
1267            }
1268        }
1269    }
1270
1271    if( !success )
1272        tr_handshakeFree( handshake );
1273
1274    if( t )
1275        torrentUnlock( t );
1276
1277    return success;
1278}
1279
1280void
1281tr_peerMgrAddIncoming( tr_peerMgr * manager,
1282                       tr_address * addr,
1283                       tr_port      port,
1284                       int          socket )
1285{
1286    managerLock( manager );
1287
1288    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
1289    {
1290        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1291        tr_netClose( socket );
1292    }
1293    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1294    {
1295        tr_netClose( socket );
1296    }
1297    else /* we don't have a connetion to them yet... */
1298    {
1299        tr_peerIo *    io;
1300        tr_handshake * handshake;
1301
1302        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
1303
1304        handshake = tr_handshakeNew( io,
1305                                     manager->session->encryptionMode,
1306                                     myHandshakeDoneCB,
1307                                     manager );
1308
1309        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1310
1311        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1312                                 handshakeCompare );
1313    }
1314
1315    managerUnlock( manager );
1316}
1317
1318static tr_bool
1319tr_isPex( const tr_pex * pex )
1320{
1321    return pex && tr_isAddress( &pex->addr );
1322}
1323
1324void
1325tr_peerMgrAddPex( tr_peerMgr *    manager,
1326                  const uint8_t * torrentHash,
1327                  uint8_t         from,
1328                  const tr_pex *  pex )
1329{
1330    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1331    {
1332        Torrent * t;
1333        managerLock( manager );
1334
1335        t = getExistingTorrent( manager, torrentHash );
1336        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1337            ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1338
1339        managerUnlock( manager );
1340    }
1341}
1342
1343tr_pex *
1344tr_peerMgrCompactToPex( const void *    compact,
1345                        size_t          compactLen,
1346                        const uint8_t * added_f,
1347                        size_t          added_f_len,
1348                        size_t *        pexCount )
1349{
1350    size_t          i;
1351    size_t          n = compactLen / 6;
1352    const uint8_t * walk = compact;
1353    tr_pex *        pex = tr_new0( tr_pex, n );
1354
1355    for( i = 0; i < n; ++i )
1356    {
1357        pex[i].addr.type = TR_AF_INET;
1358        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1359        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1360        if( added_f && ( n == added_f_len ) )
1361            pex[i].flags = added_f[i];
1362    }
1363
1364    *pexCount = n;
1365    return pex;
1366}
1367
1368tr_pex *
1369tr_peerMgrCompact6ToPex( const void    * compact,
1370                         size_t          compactLen,
1371                         const uint8_t * added_f,
1372                         size_t          added_f_len,
1373                         size_t        * pexCount )
1374{
1375    size_t          i;
1376    size_t          n = compactLen / 18;
1377    const uint8_t * walk = compact;
1378    tr_pex *        pex = tr_new0( tr_pex, n );
1379   
1380    for( i = 0; i < n; ++i )
1381    {
1382        pex[i].addr.type = TR_AF_INET6;
1383        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1384        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1385        if( added_f && ( n == added_f_len ) )
1386            pex[i].flags = added_f[i];
1387    }
1388   
1389    *pexCount = n;
1390    return pex;
1391}
1392
1393tr_pex *
1394tr_peerMgrArrayToPex( const void * array,
1395                      size_t       arrayLen,
1396                      size_t      * pexCount )
1397{
1398    size_t          i;
1399    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1400    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1401    const uint8_t * walk = array;
1402    tr_pex        * pex = tr_new0( tr_pex, n );
1403   
1404    for( i = 0 ; i < n ; i++ ) {
1405        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1406        tr_suspectAddress( &pex[i].addr, "tracker"  );
1407        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1408        pex[i].flags = 0x00;
1409        walk += sizeof( tr_address ) + 2;
1410    }
1411   
1412    *pexCount = n;
1413    return pex;
1414}
1415
1416/**
1417***
1418**/
1419
1420void
1421tr_peerMgrSetBlame( tr_peerMgr *     manager,
1422                    const uint8_t *  torrentHash,
1423                    tr_piece_index_t pieceIndex,
1424                    int              success )
1425{
1426    if( !success )
1427    {
1428        int        peerCount, i;
1429        Torrent *  t = getExistingTorrent( manager, torrentHash );
1430        tr_peer ** peers;
1431
1432        assert( torrentIsLocked( t ) );
1433
1434        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1435        for( i = 0; i < peerCount; ++i )
1436        {
1437            tr_peer * peer = peers[i];
1438            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1439            {
1440                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1441                        tr_peerIoAddrStr( &peer->addr, peer->port ),
1442                        pieceIndex, (int)peer->strikes + 1 );
1443                addStrike( t, peer );
1444            }
1445        }
1446    }
1447}
1448
1449int
1450tr_pexCompare( const void * va, const void * vb )
1451{
1452    const tr_pex * a = va;
1453    const tr_pex * b = vb;
1454    int i;
1455
1456    assert( tr_isPex( a ) );
1457    assert( tr_isPex( b ) );
1458
1459    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1460        return i;
1461
1462    if( a->port != b->port )
1463        return a->port < b->port ? -1 : 1;
1464
1465    return 0;
1466}
1467
1468static int
1469peerPrefersCrypto( const tr_peer * peer )
1470{
1471    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1472        return TRUE;
1473
1474    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1475        return FALSE;
1476
1477    return tr_peerIoIsEncrypted( peer->io );
1478}
1479
1480int
1481tr_peerMgrGetPeers( tr_peerMgr      * manager,
1482                    const uint8_t   * torrentHash,
1483                    tr_pex         ** setme_pex,
1484                    uint8_t           af)
1485{
1486    int peersReturning = 0;
1487    const Torrent *  t;
1488
1489    managerLock( manager );
1490
1491    t = getExistingTorrent( manager, torrentHash );
1492    if( t == NULL )
1493    {
1494        *setme_pex = NULL;
1495    }
1496    else
1497    {
1498        int i;
1499        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1500        const int peerCount = tr_ptrArraySize( &t->peers );
1501        /* for now, this will waste memory on torrents that have both
1502         * ipv6 and ipv4 peers */
1503        tr_pex * pex = tr_new( tr_pex, peerCount );
1504        tr_pex * walk = pex;
1505
1506        for( i=0; i<peerCount; ++i )
1507        {
1508            const tr_peer * peer = peers[i];
1509            if( peer->addr.type == af )
1510            {
1511                const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1512
1513                assert( tr_isAddress( &peer->addr ) );
1514                walk->addr = peer->addr;
1515                walk->port = peer->port;
1516                walk->flags = 0;
1517                if( peerPrefersCrypto( peer ) )
1518                    walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1519                if( ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 ) )
1520                    walk->flags |= ADDED_F_SEED_FLAG;
1521                ++peersReturning;
1522                ++walk;
1523            }
1524        }
1525
1526        assert( ( walk - pex ) == peersReturning );
1527        qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
1528        *setme_pex = pex;
1529    }
1530
1531    managerUnlock( manager );
1532    return peersReturning;
1533}
1534
1535static int reconnectPulse( void * vtorrent );
1536
1537static int rechokePulse( void * vtorrent );
1538
1539void
1540tr_peerMgrStartTorrent( tr_peerMgr *    manager,
1541                        const uint8_t * torrentHash )
1542{
1543    Torrent * t;
1544
1545    managerLock( manager );
1546
1547    t = getExistingTorrent( manager, torrentHash );
1548
1549    assert( t );
1550    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1551    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1552
1553    if( !t->isRunning )
1554    {
1555        t->isRunning = 1;
1556
1557        t->reconnectTimer = tr_timerNew( t->manager->session,
1558                                         reconnectPulse, t,
1559                                         RECONNECT_PERIOD_MSEC );
1560
1561        t->rechokeTimer = tr_timerNew( t->manager->session,
1562                                       rechokePulse, t,
1563                                       RECHOKE_PERIOD_MSEC );
1564
1565        reconnectPulse( t );
1566
1567        rechokePulse( t );
1568
1569        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1570            refillSoon( t );
1571    }
1572
1573    managerUnlock( manager );
1574}
1575
1576static void
1577stopTorrent( Torrent * t )
1578{
1579    assert( torrentIsLocked( t ) );
1580
1581    t->isRunning = 0;
1582    tr_timerFree( &t->rechokeTimer );
1583    tr_timerFree( &t->reconnectTimer );
1584
1585    /* disconnect the peers. */
1586    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
1587    tr_ptrArrayClear( &t->peers );
1588
1589    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1590     * which removes the handshake from t->outgoingHandshakes... */
1591    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1592        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1593}
1594
1595void
1596tr_peerMgrStopTorrent( tr_peerMgr *    manager,
1597                       const uint8_t * torrentHash )
1598{
1599    managerLock( manager );
1600
1601    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1602
1603    managerUnlock( manager );
1604}
1605
1606void
1607tr_peerMgrAddTorrent( tr_peerMgr * manager,
1608                      tr_torrent * tor )
1609{
1610    Torrent * t;
1611
1612    managerLock( manager );
1613
1614    assert( tor );
1615    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1616
1617    t = torrentConstructor( manager, tor );
1618    tr_ptrArrayInsertSorted( &manager->torrents, t, torrentCompare );
1619
1620    managerUnlock( manager );
1621}
1622
1623void
1624tr_peerMgrRemoveTorrent( tr_peerMgr *    manager,
1625                         const uint8_t * torrentHash )
1626{
1627    Torrent * t;
1628
1629    managerLock( manager );
1630
1631    t = getExistingTorrent( manager, torrentHash );
1632    assert( t );
1633    stopTorrent( t );
1634    tr_ptrArrayRemoveSorted( &manager->torrents, t, torrentCompare );
1635    torrentDestructor( t );
1636
1637    managerUnlock( manager );
1638}
1639
1640void
1641tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1642                               const uint8_t *    torrentHash,
1643                               int8_t *           tab,
1644                               unsigned int       tabCount )
1645{
1646    tr_piece_index_t   i;
1647    const Torrent *    t;
1648    const tr_torrent * tor;
1649    float              interval;
1650    tr_bool            isSeed;
1651    int                peerCount;
1652    const tr_peer **   peers;
1653    managerLock( manager );
1654
1655    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1656    tor = t->tor;
1657    interval = tor->info.pieceCount / (float)tabCount;
1658    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
1659    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1660    peerCount = tr_ptrArraySize( &t->peers );
1661
1662    memset( tab, 0, tabCount );
1663
1664    for( i = 0; tor && i < tabCount; ++i )
1665    {
1666        const int piece = i * interval;
1667
1668        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
1669            tab[i] = -1;
1670        else if( peerCount ) {
1671            int j;
1672            for( j = 0; j < peerCount; ++j )
1673                if( tr_bitfieldHas( peers[j]->have, i ) )
1674                    ++tab[i];
1675        }
1676    }
1677
1678    managerUnlock( manager );
1679}
1680
1681/* Returns the pieces that are available from peers */
1682tr_bitfield*
1683tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1684                        const uint8_t *    torrentHash )
1685{
1686    int i;
1687    int peerCount;
1688    Torrent * t;
1689    const tr_peer ** peers;
1690    tr_bitfield * pieces;
1691    managerLock( manager );
1692
1693    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1694    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1695    peerCount = tr_ptrArraySize( &t->peers );
1696    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1697    for( i=0; i<peerCount; ++i )
1698        tr_bitfieldOr( pieces, peers[i]->have );
1699
1700    managerUnlock( manager );
1701    return pieces;
1702}
1703
1704int
1705tr_peerMgrHasConnections( const tr_peerMgr * manager,
1706                          const uint8_t *    torrentHash )
1707{
1708    int ret;
1709    const Torrent * t;
1710    managerLock( manager );
1711
1712    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1713    ret = t && ( !tr_ptrArrayEmpty( &t->peers ) || !tr_ptrArrayEmpty( &t->webseeds ) );
1714
1715    managerUnlock( manager );
1716    return ret;
1717}
1718
1719void
1720tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1721                        const uint8_t    * torrentHash,
1722                        int              * setmePeersKnown,
1723                        int              * setmePeersConnected,
1724                        int              * setmeSeedsConnected,
1725                        int              * setmeWebseedsSendingToUs,
1726                        int              * setmePeersSendingToUs,
1727                        int              * setmePeersGettingFromUs,
1728                        int              * setmePeersFrom )
1729{
1730    int i, size;
1731    const Torrent * t;
1732    const tr_peer ** peers;
1733    const tr_webseed ** webseeds;
1734
1735    managerLock( manager );
1736
1737    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1738    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1739    size = tr_ptrArraySize( &t->peers );
1740
1741    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
1742    *setmePeersConnected       = 0;
1743    *setmeSeedsConnected       = 0;
1744    *setmePeersGettingFromUs   = 0;
1745    *setmePeersSendingToUs     = 0;
1746    *setmeWebseedsSendingToUs  = 0;
1747
1748    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1749        setmePeersFrom[i] = 0;
1750
1751    for( i=0; i<size; ++i )
1752    {
1753        const tr_peer * peer = peers[i];
1754        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1755
1756        if( peer->io == NULL ) /* not connected */
1757            continue;
1758
1759        ++*setmePeersConnected;
1760
1761        ++setmePeersFrom[atom->from];
1762
1763        if( clientIsDownloadingFrom( peer ) )
1764            ++*setmePeersSendingToUs;
1765
1766        if( clientIsUploadingTo( peer ) )
1767            ++*setmePeersGettingFromUs;
1768
1769        if( atom->flags & ADDED_F_SEED_FLAG )
1770            ++*setmeSeedsConnected;
1771    }
1772
1773    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1774    size = tr_ptrArraySize( &t->webseeds );
1775    for( i=0; i<size; ++i )
1776        if( tr_webseedIsActive( webseeds[i] ) )
1777            ++*setmeWebseedsSendingToUs;
1778
1779    managerUnlock( manager );
1780}
1781
1782float*
1783tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1784                     const uint8_t *    torrentHash )
1785{
1786    const Torrent * t;
1787    const tr_webseed ** webseeds;
1788    int i;
1789    int webseedCount;
1790    float * ret;
1791    uint64_t now;
1792
1793    assert( manager );
1794    managerLock( manager );
1795
1796    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1797    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
1798    webseedCount = tr_ptrArraySize( &t->webseeds );
1799    assert( webseedCount == t->tor->info.webseedCount );
1800    ret = tr_new0( float, webseedCount );
1801    now = tr_date( );
1802
1803    for( i=0; i<webseedCount; ++i )
1804        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
1805            ret[i] = -1.0;
1806
1807    managerUnlock( manager );
1808    return ret;
1809}
1810
1811double
1812tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
1813{
1814    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
1815}
1816
1817
1818struct tr_peer_stat *
1819tr_peerMgrPeerStats( const   tr_peerMgr  * manager,
1820                     const   uint8_t     * torrentHash,
1821                     int                 * setmeCount )
1822{
1823    int i, size;
1824    const Torrent * t;
1825    const tr_peer ** peers;
1826    tr_peer_stat * ret;
1827    uint64_t now;
1828
1829    assert( manager );
1830    managerLock( manager );
1831
1832    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1833    size = tr_ptrArraySize( &t->peers );
1834    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1835    ret = tr_new0( tr_peer_stat, size );
1836    now = tr_date( );
1837
1838    for( i = 0; i < size; ++i )
1839    {
1840        char *                   pch;
1841        const tr_peer *          peer = peers[i];
1842        const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1843        tr_peer_stat *           stat = ret + i;
1844        tr_address               norm_addr;
1845
1846        norm_addr = peer->addr;
1847        tr_normalizeV4Mapped( &norm_addr );
1848        tr_ntop( &norm_addr, stat->addr, sizeof( stat->addr ) );
1849        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1850                   sizeof( stat->client ) );
1851        stat->port               = ntohs( peer->port );
1852        stat->from               = atom->from;
1853        stat->progress           = peer->progress;
1854        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1855        stat->rateToPeer         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
1856        stat->rateToClient       = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
1857        stat->peerIsChoked       = peer->peerIsChoked;
1858        stat->peerIsInterested   = peer->peerIsInterested;
1859        stat->clientIsChoked     = peer->clientIsChoked;
1860        stat->clientIsInterested = peer->clientIsInterested;
1861        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1862        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1863        stat->isUploadingTo      = clientIsUploadingTo( peer );
1864        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1865
1866        pch = stat->flagStr;
1867        if( t->optimistic == peer ) *pch++ = 'O';
1868        if( stat->isDownloadingFrom ) *pch++ = 'D';
1869        else if( stat->clientIsInterested ) *pch++ = 'd';
1870        if( stat->isUploadingTo ) *pch++ = 'U';
1871        else if( stat->peerIsInterested ) *pch++ = 'u';
1872        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1873        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1874        if( stat->isEncrypted ) *pch++ = 'E';
1875        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1876        if( stat->isIncoming ) *pch++ = 'I';
1877        *pch = '\0';
1878    }
1879
1880    *setmeCount = size;
1881
1882    managerUnlock( manager );
1883    return ret;
1884}
1885
1886/**
1887***
1888**/
1889
1890struct ChokeData
1891{
1892    tr_bool         doUnchoke;
1893    tr_bool         isInterested;
1894    tr_bool         isChoked;
1895    int             rate;
1896    tr_peer *       peer;
1897};
1898
1899static int
1900compareChoke( const void * va,
1901              const void * vb )
1902{
1903    const struct ChokeData * a = va;
1904    const struct ChokeData * b = vb;
1905
1906    if( a->rate != b->rate ) /* prefer higher overall speeds */
1907        return a->rate > b->rate ? -1 : 1;
1908
1909    if( a->isChoked != b->isChoked ) /* prefer unchoked */
1910        return a->isChoked ? 1 : -1;
1911
1912    return 0;
1913}
1914
1915static int
1916isNew( const tr_peer * peer )
1917{
1918    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1919}
1920
1921static int
1922isSame( const tr_peer * peer )
1923{
1924    return peer && peer->client && strstr( peer->client, "Transmission" );
1925}
1926
1927/**
1928***
1929**/
1930
1931static void
1932rechoke( Torrent * t )
1933{
1934    int i, size, unchokedInterested;
1935    const int peerCount = tr_ptrArraySize( &t->peers );
1936    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1937    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1938    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
1939    const uint64_t now = tr_date( );
1940
1941    assert( torrentIsLocked( t ) );
1942
1943    /* sort the peers by preference and rate */
1944    for( i = 0, size = 0; i < peerCount; ++i )
1945    {
1946        tr_peer * peer = peers[i];
1947        struct peer_atom * atom = getExistingAtom( t, &peer->addr );
1948
1949        if( peer->progress >= 1.0 ) /* choke all seeds */
1950        {
1951            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1952        }
1953        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
1954        {
1955            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1956        }
1957        else if( chokeAll ) /* choke everyone if we're not uploading */
1958        {
1959            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1960        }
1961        else
1962        {
1963            struct ChokeData * n = &choke[size++];
1964            n->peer         = peer;
1965            n->isInterested = peer->peerIsInterested;
1966            n->isChoked     = peer->peerIsChoked;
1967            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
1968        }
1969    }
1970
1971    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
1972
1973    /**
1974     * Reciprocation and number of uploads capping is managed by unchoking
1975     * the N peers which have the best upload rate and are interested.
1976     * This maximizes the client's download rate. These N peers are
1977     * referred to as downloaders, because they are interested in downloading
1978     * from the client.
1979     *
1980     * Peers which have a better upload rate (as compared to the downloaders)
1981     * but aren't interested get unchoked. If they become interested, the
1982     * downloader with the worst upload rate gets choked. If a client has
1983     * a complete file, it uses its upload rate rather than its download
1984     * rate to decide which peers to unchoke.
1985     */
1986    unchokedInterested = 0;
1987    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1988        choke[i].doUnchoke = 1;
1989        if( choke[i].isInterested )
1990            ++unchokedInterested;
1991    }
1992
1993    /* optimistic unchoke */
1994    if( i < size )
1995    {
1996        int n;
1997        struct ChokeData * c;
1998        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
1999
2000        for( ; i<size; ++i )
2001        {
2002            if( choke[i].isInterested )
2003            {
2004                const tr_peer * peer = choke[i].peer;
2005                int x = 1, y;
2006                if( isNew( peer ) ) x *= 3;
2007                if( isSame( peer ) ) x *= 3;
2008                for( y=0; y<x; ++y )
2009                    tr_ptrArrayAppend( &randPool, &choke[i] );
2010            }
2011        }
2012
2013        if(( n = tr_ptrArraySize( &randPool )))
2014        {
2015            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2016            c->doUnchoke = 1;
2017            t->optimistic = c->peer;
2018        }
2019
2020        tr_ptrArrayDestruct( &randPool, NULL );
2021    }
2022
2023    for( i=0; i<size; ++i )
2024        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2025
2026    /* cleanup */
2027    tr_free( choke );
2028}
2029
2030static int
2031rechokePulse( void * vtorrent )
2032{
2033    Torrent * t = vtorrent;
2034
2035    torrentLock( t );
2036    rechoke( t );
2037    torrentUnlock( t );
2038    return TRUE;
2039}
2040
2041/***
2042****
2043****  Life and Death
2044****
2045***/
2046
2047static int
2048shouldPeerBeClosed( const Torrent * t,
2049                    const tr_peer * peer,
2050                    int             peerCount )
2051{
2052    const tr_torrent *       tor = t->tor;
2053    const time_t             now = time( NULL );
2054    const struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2055
2056    /* if it's marked for purging, close it */
2057    if( peer->doPurge )
2058    {
2059        tordbg( t, "purging peer %s because its doPurge flag is set",
2060                tr_peerIoAddrStr( &atom->addr, atom->port ) );
2061        return TRUE;
2062    }
2063
2064    /* if we're seeding and the peer has everything we have,
2065     * and enough time has passed for a pex exchange, then disconnect */
2066    if( tr_torrentIsSeed( tor ) )
2067    {
2068        int peerHasEverything;
2069        if( atom->flags & ADDED_F_SEED_FLAG )
2070            peerHasEverything = TRUE;
2071        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2072            peerHasEverything = FALSE;
2073        else {
2074            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2075            tr_bitfieldDifference( tmp, peer->have );
2076            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2077            tr_bitfieldFree( tmp );
2078        }
2079
2080        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2081        {
2082            tordbg( t, "purging peer %s because we're both seeds",
2083                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
2084            return TRUE;
2085        }
2086    }
2087
2088    /* disconnect if it's been too long since piece data has been transferred.
2089     * this is on a sliding scale based on number of available peers... */
2090    {
2091        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2092        /* if we have >= relaxIfFewerThan, strictness is 100%.
2093         * if we have zero connections, strictness is 0% */
2094        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2095            ? 1.0
2096            : peerCount / (float)relaxStrictnessIfFewerThanN;
2097        const int lo = MIN_UPLOAD_IDLE_SECS;
2098        const int hi = MAX_UPLOAD_IDLE_SECS;
2099        const int limit = hi - ( ( hi - lo ) * strictness );
2100        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2101/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2102        if( idleTime > limit ) {
2103            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2104                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
2105            return TRUE;
2106        }
2107    }
2108
2109    return FALSE;
2110}
2111
2112static tr_peer **
2113getPeersToClose( Torrent * t, int * setmeSize )
2114{
2115    int i, peerCount, outsize;
2116    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2117    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2118
2119    assert( torrentIsLocked( t ) );
2120
2121    for( i = outsize = 0; i < peerCount; ++i )
2122        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
2123            ret[outsize++] = peers[i];
2124
2125    *setmeSize = outsize;
2126    return ret;
2127}
2128
2129static int
2130compareCandidates( const void * va,
2131                   const void * vb )
2132{
2133    const struct peer_atom * a = *(const struct peer_atom**) va;
2134    const struct peer_atom * b = *(const struct peer_atom**) vb;
2135
2136    /* <Charles> Here we would probably want to try reconnecting to
2137     * peers that had most recently given us data. Lots of users have
2138     * trouble with resets due to their routers and/or ISPs. This way we
2139     * can quickly recover from an unwanted reset. So we sort
2140     * piece_data_time in descending order.
2141     */
2142
2143    if( a->piece_data_time != b->piece_data_time )
2144        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2145
2146    if( a->numFails != b->numFails )
2147        return a->numFails < b->numFails ? -1 : 1;
2148
2149    if( a->time != b->time )
2150        return a->time < b->time ? -1 : 1;
2151
2152    /* all other things being equal, prefer peers whose
2153     * information comes from a more reliable source */
2154    if( a->from != b->from )
2155        return a->from < b->from ? -1 : 1;
2156
2157    return 0;
2158}
2159
2160static int
2161getReconnectIntervalSecs( const struct peer_atom * atom )
2162{
2163    int          sec;
2164    const time_t now = time( NULL );
2165
2166    /* if we were recently connected to this peer and transferring piece
2167     * data, try to reconnect to them sooner rather that later -- we don't
2168     * want network troubles to get in the way of a good peer. */
2169    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2170        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2171
2172    /* don't allow reconnects more often than our minimum */
2173    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2174        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2175
2176    /* otherwise, the interval depends on how many times we've tried
2177     * and failed to connect to the peer */
2178    else switch( atom->numFails ) {
2179        case 0: sec = 0; break;
2180        case 1: sec = 5; break;
2181        case 2: sec = 2 * 60; break;
2182        case 3: sec = 15 * 60; break;
2183        case 4: sec = 30 * 60; break;
2184        case 5: sec = 60 * 60; break;
2185        default: sec = 120 * 60; break;
2186    }
2187
2188    return sec;
2189}
2190
2191static struct peer_atom **
2192getPeerCandidates( Torrent * t, int * setmeSize )
2193{
2194    int                 i, atomCount, retCount;
2195    struct peer_atom ** atoms;
2196    struct peer_atom ** ret;
2197    const time_t        now = time( NULL );
2198    const int           seed = tr_torrentIsSeed( t->tor );
2199
2200    assert( torrentIsLocked( t ) );
2201
2202    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2203    ret = tr_new( struct peer_atom*, atomCount );
2204    for( i = retCount = 0; i < atomCount; ++i )
2205    {
2206        int                interval;
2207        struct peer_atom * atom = atoms[i];
2208
2209        /* peer fed us too much bad data ... we only keep it around
2210         * now to weed it out in case someone sends it to us via pex */
2211        if( atom->myflags & MYFLAG_BANNED )
2212            continue;
2213
2214        /* peer was unconnectable before, so we're not going to keep trying.
2215         * this is needs a separate flag from `banned', since if they try
2216         * to connect to us later, we'll let them in */
2217        if( atom->myflags & MYFLAG_UNREACHABLE )
2218            continue;
2219
2220        /* we don't need two connections to the same peer... */
2221        if( peerIsInUse( t, &atom->addr ) )
2222            continue;
2223
2224        /* no need to connect if we're both seeds... */
2225        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2226                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2227            continue;
2228
2229        /* don't reconnect too often */
2230        interval = getReconnectIntervalSecs( atom );
2231        if( ( now - atom->time ) < interval )
2232        {
2233            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2234                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
2235            continue;
2236        }
2237
2238        /* Don't connect to peers in our blocklist */
2239        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2240            continue;
2241
2242        ret[retCount++] = atom;
2243    }
2244
2245    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2246    *setmeSize = retCount;
2247    return ret;
2248}
2249
2250static int
2251reconnectPulse( void * vtorrent )
2252{
2253    Torrent *     t = vtorrent;
2254    static time_t prevTime = 0;
2255    static int    newConnectionsThisSecond = 0;
2256    time_t        now;
2257
2258    torrentLock( t );
2259
2260    now = time( NULL );
2261    if( prevTime != now )
2262    {
2263        prevTime = now;
2264        newConnectionsThisSecond = 0;
2265    }
2266
2267    if( !t->isRunning )
2268    {
2269        removeAllPeers( t );
2270    }
2271    else
2272    {
2273        int i, nCandidates, nBad;
2274        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
2275        struct tr_peer ** connections = getPeersToClose( t, &nBad );
2276
2277        //if( nBad || nCandidates )
2278            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
2279                    "%d connection candidates, %d atoms, max per pulse is %d",
2280                    t->tor->info.name, nBad, nCandidates,
2281                    tr_ptrArraySize( &t->pool ),
2282                    (int)MAX_RECONNECTIONS_PER_PULSE );
2283
2284        /* disconnect some peers.
2285           if we transferred piece data, then they might be good peers,
2286           so reset their `numFails' weight to zero.  otherwise we connected
2287           to them fruitlessly, so mark it as another fail */
2288        for( i = 0; i < nBad; ++i ) {
2289            tr_peer * peer = connections[i];
2290            struct peer_atom * atom = getExistingAtom( t, &peer->addr );
2291            if( atom->piece_data_time )
2292                atom->numFails = 0;
2293            else
2294                ++atom->numFails;
2295            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2296            removePeer( t, peer );
2297        }
2298
2299tordbg( t, "nCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, getPeerCount(t) is %d, getMaxPeerCount(t) is %d, newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2300        (int)nCandidates, (int)MAX_RECONNECTIONS_PER_PULSE, (int)getPeerCount( t ), (int)getMaxPeerCount( t->tor ), (int)newConnectionsThisSecond, (int)MAX_CONNECTIONS_PER_SECOND );
2301
2302        /* add some new ones */
2303        for( i = 0;    ( i < nCandidates )
2304           && ( i < MAX_RECONNECTIONS_PER_PULSE )
2305           && ( getPeerCount( t ) < getMaxPeerCount( t->tor ) )
2306           && ( newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND ); ++i )
2307        {
2308            tr_peerMgr *       mgr = t->manager;
2309            struct peer_atom * atom = candidates[i];
2310            tr_peerIo *        io;
2311
2312            tordbg( t, "Starting an OUTGOING connection with %s",
2313                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2314
2315            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->hash );
2316
2317            if( io == NULL )
2318            {
2319                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2320                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2321                atom->myflags |= MYFLAG_UNREACHABLE;
2322            }
2323            else
2324            {
2325                tr_handshake * handshake = tr_handshakeNew( io,
2326                                                            mgr->session->encryptionMode,
2327                                                            myHandshakeDoneCB,
2328                                                            mgr );
2329
2330                assert( tr_peerIoGetTorrentHash( io ) );
2331
2332                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2333
2334                ++newConnectionsThisSecond;
2335
2336                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2337                                         handshakeCompare );
2338            }
2339
2340            atom->time = time( NULL );
2341        }
2342
2343        /* cleanup */
2344        tr_free( connections );
2345        tr_free( candidates );
2346    }
2347
2348    torrentUnlock( t );
2349    return TRUE;
2350}
2351
2352/****
2353*****
2354*****  BANDWIDTH ALLOCATION
2355*****
2356****/
2357
2358static void
2359pumpAllPeers( tr_peerMgr * mgr )
2360{
2361    const int torrentCount = tr_ptrArraySize( &mgr->torrents );
2362    int       i, j;
2363
2364    for( i=0; i<torrentCount; ++i )
2365    {
2366        Torrent * t = tr_ptrArrayNth( &mgr->torrents, i );
2367        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
2368        {
2369            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
2370            tr_peerMsgsPulse( peer->msgs );
2371        }
2372    }
2373}
2374
2375static int
2376bandwidthPulse( void * vmgr )
2377{
2378    tr_peerMgr * mgr = vmgr;
2379    managerLock( mgr );
2380
2381    /* FIXME: this next line probably isn't necessary... */
2382    pumpAllPeers( mgr );
2383
2384    /* allocate bandwidth to the peers */
2385    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
2386    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
2387
2388    managerUnlock( mgr );
2389    return TRUE;
2390}
Note: See TracBrowser for help on using the repository browser.