source: trunk/libtransmission/peer-mgr.c @ 5632

Last change on this file since 5632 was 5632, checked in by charles, 14 years ago

(1) fix choke/unchoke error. (2) if a peer keeps trying to reconnect to us, hang up during the handshake.

  • Property svn:keywords set to Date Rev Author Id
File size: 57.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 5632 2008-04-17 03:48:56Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "ratecontrol.h"
36#include "torrent.h"
37#include "trcompat.h" /* strlcpy */
38#include "trevent.h"
39#include "utils.h"
40
41/**
42*** The "SWIFT" system is described by Karthik Tamilmani,
43*** Vinay Pai, and Alexander Mohr of Stony Brook University
44*** in their paper "SWIFT: A System With Incentives For Trading"
45*** http://citeseer.ist.psu.edu/tamilmani04swift.html
46***
47*** More SWIFT constants are defined in peer-mgr-private.h
48**/
49
50/**
51 * Allow new peers to download this many bytes from
52 * us when getting started.  This can prevent gridlock
53 * with other peers using tit-for-tat algorithms
54 */
55static const int SWIFT_INITIAL_CREDIT = 64 * 1024; /* 64 KiB */
56
57/**
58 * We expend a fraction of our torrent's total upload speed
59 * on largesse by uniformly distributing free credit to
60 * all of our peers.  This too helps prevent gridlock.
61 */
62static const double SWIFT_LARGESSE = 0.15; /* 15% of our UL */
63
64/**
65 * How frequently to extend largesse-based credit
66 */
67static const int SWIFT_PERIOD_MSEC = 5000;
68
69
70enum
71{
72    /* how frequently to change which peers are choked */
73    RECHOKE_PERIOD_MSEC = (10 * 1000),
74
75    /* how frequently to refill peers' request lists */
76    REFILL_PERIOD_MSEC = 666,
77
78    /* following the BT spec, we consider ourselves `snubbed' if
79     * we're we don't get piece data from a peer in this long */
80    SNUBBED_SEC = 60,
81
82    /* when many peers are available, keep idle ones this long */
83    MIN_UPLOAD_IDLE_SECS = (60 * 3),
84
85    /* when few peers are available, keep idle ones this long */
86    MAX_UPLOAD_IDLE_SECS = (60 * 10),
87
88    /* how frequently to decide which peers live and die */
89    RECONNECT_PERIOD_MSEC = (2 * 1000),
90
91    /* max # of peers to ask fer per torrent per reconnect pulse */
92    MAX_RECONNECTIONS_PER_PULSE = 1,
93
94    /* max number of peers to ask for per second overall.
95     * this throttle is to avoid overloading the router */
96    MAX_CONNECTIONS_PER_SECOND = 8,
97
98    /* number of unchoked peers per torrent */
99    MAX_UNCHOKED_PEERS = 12,
100
101    /* corresponds to ut_pex's added.f flags */
102    ADDED_F_ENCRYPTION_FLAG = 1,
103
104    /* corresponds to ut_pex's added.f flags */
105    ADDED_F_SEED_FLAG = 2,
106
107    /* number of bad pieces a peer is allowed to send before we ban them */
108    MAX_BAD_PIECES_PER_PEER = 3,
109
110    /* use for bitwise operations w/peer_atom.myflags */
111    MYFLAG_BANNED = 1,
112
113    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
114    MYFLAG_UNREACHABLE = 2
115};
116
117
118/**
119***
120**/
121
122/* We keep one of these for every peer we know about, whether
123 * it's connected or not, so the struct must be small.
124 * When our current connections underperform, we dip back
125 * into this list for new ones. */
126struct peer_atom
127{   
128    uint8_t from;
129    uint8_t flags; /* these match the added_f flags */
130    uint8_t myflags; /* flags that aren't defined in added_f */
131    uint16_t port;
132    uint16_t numFails;
133    struct in_addr addr;
134    time_t time;
135    time_t piece_data_time;
136};
137
138typedef struct
139{
140    uint8_t hash[SHA_DIGEST_LENGTH];
141    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
142    tr_ptrArray * pool; /* struct peer_atom */
143    tr_ptrArray * peers; /* tr_peer */
144    tr_timer * reconnectTimer;
145    tr_timer * rechokeTimer;
146    tr_timer * refillTimer;
147    tr_timer * swiftTimer;
148    tr_torrent * tor;
149    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
150    tr_bitfield * requested;
151
152    unsigned int isRunning : 1;
153
154    struct tr_peerMgr * manager;
155}
156Torrent;
157
158struct tr_peerMgr
159{
160    tr_handle * handle;
161    tr_ptrArray * torrents; /* Torrent */
162    tr_ptrArray * incomingHandshakes; /* tr_handshake */
163};
164
165/**
166***
167**/
168
169static void
170myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
171{
172    FILE * fp = tr_getLog( );
173    if( fp != NULL )
174    {
175        va_list args;
176        char timestr[64];
177        struct evbuffer * buf = evbuffer_new( );
178        char * myfile = tr_strdup( file );
179
180        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
181        if( t != NULL )
182            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
183        va_start( args, fmt );
184        evbuffer_add_vprintf( buf, fmt, args );
185        va_end( args );
186        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
187        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
188
189        tr_free( myfile );
190        evbuffer_free( buf );
191    }
192}
193
194#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
195
196/**
197***
198**/
199
200static void
201managerLock( const struct tr_peerMgr * manager )
202{
203    tr_globalLock( manager->handle );
204}
205static void
206managerUnlock( const struct tr_peerMgr * manager )
207{
208    tr_globalUnlock( manager->handle );
209}
210static void
211torrentLock( Torrent * torrent )
212{
213    managerLock( torrent->manager );
214}
215static void
216torrentUnlock( Torrent * torrent )
217{
218    managerUnlock( torrent->manager );
219}
220static int
221torrentIsLocked( const Torrent * t )
222{
223    return tr_globalIsLocked( t->manager->handle );
224}
225
226/**
227***
228**/
229
230static int
231compareAddresses( const struct in_addr * a, const struct in_addr * b )
232{
233    return tr_compareUint32( a->s_addr, b->s_addr );
234}
235
236static int
237handshakeCompareToAddr( const void * va, const void * vb )
238{
239    const tr_handshake * a = va;
240    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
241}
242
243static int
244handshakeCompare( const void * a, const void * b )
245{
246    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
247}
248
249static tr_handshake*
250getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
251{
252    return tr_ptrArrayFindSorted( handshakes,
253                                  in_addr,
254                                  handshakeCompareToAddr );
255}
256
257static int
258comparePeerAtomToAddress( const void * va, const void * vb )
259{
260    const struct peer_atom * a = va;
261    return compareAddresses( &a->addr, vb );
262}
263
264static int
265comparePeerAtoms( const void * va, const void * vb )
266{
267    const struct peer_atom * b = vb;
268    return comparePeerAtomToAddress( va, &b->addr );
269}
270
271/**
272***
273**/
274
275static int
276torrentCompare( const void * va, const void * vb )
277{
278    const Torrent * a = va;
279    const Torrent * b = vb;
280    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
281}
282
283static int
284torrentCompareToHash( const void * va, const void * vb )
285{
286    const Torrent * a = va;
287    const uint8_t * b_hash = vb;
288    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
289}
290
291static Torrent*
292getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
293{
294    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
295                                             hash,
296                                             torrentCompareToHash );
297}
298
299static int
300peerCompare( const void * va, const void * vb )
301{
302    const tr_peer * a = va;
303    const tr_peer * b = vb;
304    return compareAddresses( &a->in_addr, &b->in_addr );
305}
306
307static int
308peerCompareToAddr( const void * va, const void * vb )
309{
310    const tr_peer * a = va;
311    return compareAddresses( &a->in_addr, vb );
312}
313
314static tr_peer*
315getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
316{
317    assert( torrentIsLocked( torrent ) );
318    assert( in_addr != NULL );
319
320    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
321                                             in_addr,
322                                             peerCompareToAddr );
323}
324
325static struct peer_atom*
326getExistingAtom( const Torrent * t, const struct in_addr * addr )
327{
328    assert( torrentIsLocked( t ) );
329    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
330}
331
332static int
333peerIsInUse( const Torrent * ct, const struct in_addr * addr )
334{
335    Torrent * t = (Torrent*) ct;
336
337    assert( torrentIsLocked ( t ) );
338
339    return getExistingPeer( t, addr )
340        || getExistingHandshake( t->outgoingHandshakes, addr )
341        || getExistingHandshake( t->manager->incomingHandshakes, addr );
342}
343
344static tr_peer*
345peerConstructor( const struct in_addr * in_addr )
346{
347    tr_peer * p;
348    p = tr_new0( tr_peer, 1 );
349    p->credit = SWIFT_INITIAL_CREDIT;
350    p->rcToClient = tr_rcInit( );
351    p->rcToPeer = tr_rcInit( );
352    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
353    return p;
354}
355
356static tr_peer*
357getPeer( Torrent * torrent, const struct in_addr * in_addr )
358{
359    tr_peer * peer;
360
361    assert( torrentIsLocked( torrent ) );
362
363    peer = getExistingPeer( torrent, in_addr );
364
365    if( peer == NULL ) {
366        peer = peerConstructor( in_addr );
367        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
368    }
369
370    return peer;
371}
372
373static void
374peerDestructor( tr_peer * peer )
375{
376    assert( peer != NULL );
377    assert( peer->msgs != NULL );
378
379    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
380    tr_peerMsgsFree( peer->msgs );
381
382    tr_peerIoFree( peer->io );
383
384    tr_bitfieldFree( peer->have );
385    tr_bitfieldFree( peer->blame );
386    tr_rcClose( peer->rcToClient );
387    tr_rcClose( peer->rcToPeer );
388    tr_free( peer->client );
389    tr_free( peer );
390}
391
392static void
393removePeer( Torrent * t, tr_peer * peer )
394{
395    tr_peer * removed;
396    struct peer_atom * atom;
397
398    assert( torrentIsLocked( t ) );
399
400    atom = getExistingAtom( t, &peer->in_addr );
401    assert( atom != NULL );
402    atom->time = time( NULL );
403
404    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
405    assert( removed == peer );
406    peerDestructor( removed );
407}
408
409static void
410removeAllPeers( Torrent * t )
411{
412    while( !tr_ptrArrayEmpty( t->peers ) )
413        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
414}
415
416static void
417torrentDestructor( Torrent * t )
418{
419    uint8_t hash[SHA_DIGEST_LENGTH];
420
421    assert( t != NULL );
422    assert( !t->isRunning );
423    assert( t->peers != NULL );
424    assert( torrentIsLocked( t ) );
425    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
426    assert( tr_ptrArrayEmpty( t->peers ) );
427
428    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
429
430    tr_timerFree( &t->reconnectTimer );
431    tr_timerFree( &t->rechokeTimer );
432    tr_timerFree( &t->refillTimer );
433    tr_timerFree( &t->swiftTimer );
434
435    tr_bitfieldFree( t->requested );
436    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
437    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
438    tr_ptrArrayFree( t->peers, NULL );
439
440    tr_free( t );
441}
442
443static Torrent*
444torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
445{
446    Torrent * t;
447
448    t = tr_new0( Torrent, 1 );
449    t->manager = manager;
450    t->tor = tor;
451    t->pool = tr_ptrArrayNew( );
452    t->peers = tr_ptrArrayNew( );
453    t->outgoingHandshakes = tr_ptrArrayNew( );
454    t->requested = tr_bitfieldNew( tor->blockCount );
455    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
456
457    return t;
458}
459
460/**
461 * For explanation, see http://www.bittorrent.org/fast_extensions.html
462 * Also see the "test-allowed-set" unit test
463 */
464struct tr_bitfield *
465tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
466                              const uint32_t         sz,        /* number of pieces in torrent */
467                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
468                              const struct in_addr * ip )       /* peer's address */
469{
470    uint8_t w[SHA_DIGEST_LENGTH + 4];
471    uint8_t x[SHA_DIGEST_LENGTH];
472    tr_bitfield_t * a;
473    uint32_t a_size;
474
475    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
476    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
477    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
478
479    a = tr_bitfieldNew( sz );
480    a_size = 0;
481   
482    while( a_size < k )
483    {
484        int i;
485        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
486        {
487            uint32_t j = i * 4;                                /* (5) */
488            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
489            uint32_t index = y % sz;                           /* (7) */
490            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
491                tr_bitfieldAdd( a, index );                    /* (9) */
492                ++a_size;
493            }
494        }
495        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
496    }
497   
498    return a;
499}
500
501tr_peerMgr*
502tr_peerMgrNew( tr_handle * handle )
503{
504    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
505    m->handle = handle;
506    m->torrents = tr_ptrArrayNew( );
507    m->incomingHandshakes = tr_ptrArrayNew( );
508    return m;
509}
510
511void
512tr_peerMgrFree( tr_peerMgr * manager )
513{
514    managerLock( manager );
515
516    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
517     * the item from manager->handshakes, so this is a little roundabout... */
518    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
519        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
520    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
521
522    /* free the torrents. */
523    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
524
525    managerUnlock( manager );
526    tr_free( manager );
527}
528
529static tr_peer**
530getConnectedPeers( Torrent * t, int * setmeCount )
531{
532    int i, peerCount, connectionCount;
533    tr_peer **peers;
534    tr_peer **ret;
535
536    assert( torrentIsLocked( t ) );
537
538    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
539    ret = tr_new( tr_peer*, peerCount );
540
541    for( i=connectionCount=0; i<peerCount; ++i )
542        if( peers[i]->msgs != NULL )
543            ret[connectionCount++] = peers[i];
544
545    *setmeCount = connectionCount;
546    return ret;
547}
548
549static int
550clientIsDownloadingFrom( const tr_peer * peer )
551{
552    return peer->clientIsInterested && !peer->clientIsChoked;
553}
554
555static int
556clientIsUploadingTo( const tr_peer * peer )
557{
558    return peer->peerIsInterested && !peer->peerIsChoked;
559}
560
561/***
562****
563***/
564
565int
566tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
567                      const uint8_t          * torrentHash,
568                      const struct in_addr   * addr )
569{
570    int isSeed = FALSE;
571    const Torrent * t = NULL;
572    const struct peer_atom * atom = NULL;
573
574    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
575    if( t )
576        atom = getExistingAtom( t, addr );
577    if( atom )
578        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
579
580    return isSeed;
581}
582
583/***
584****  Refill
585***/
586
587struct tr_refill_piece
588{
589    tr_priority_t priority;
590    int missingBlockCount;
591    uint16_t random;
592    uint32_t piece;
593    uint32_t peerCount;
594};
595
596static int
597compareRefillPiece (const void * aIn, const void * bIn)
598{
599    const struct tr_refill_piece * a = aIn;
600    const struct tr_refill_piece * b = bIn;
601
602    /* fewer missing pieces goes first */
603    if( a->missingBlockCount != b->missingBlockCount )
604        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
605   
606    /* if one piece has a higher priority, it goes first */
607    if( a->priority != b->priority )
608        return a->priority > b->priority ? -1 : 1;
609   
610    /* otherwise if one has fewer peers, it goes first */
611    if (a->peerCount != b->peerCount)
612        return a->peerCount < b->peerCount ? -1 : 1;
613
614    /* otherwise go with our random seed */
615    return tr_compareUint16( a->random, b->random );
616}
617
618static int
619isPieceInteresting( const tr_torrent  * tor,
620                    tr_piece_index_t    piece )
621{
622    if( tor->info.pieces[piece].dnd ) /* we don't want it */
623        return 0;
624
625    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
626        return 0;
627
628    return 1;
629}
630
631static uint32_t*
632getPreferredPieces( Torrent     * t,
633                    uint32_t    * pieceCount )
634{
635    const tr_torrent * tor = t->tor;
636    const tr_info * inf = &tor->info;
637    tr_piece_index_t i;
638    uint32_t poolSize = 0;
639    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
640    int peerCount;
641    tr_peer** peers;
642
643    assert( torrentIsLocked( t ) );
644
645    peers = getConnectedPeers( t, &peerCount );
646
647    for( i=0; i<inf->pieceCount; ++i )
648        if( isPieceInteresting( tor, i ) )
649            pool[poolSize++] = i;
650
651    /* sort the pool from most interesting to least... */
652    if( poolSize > 1 )
653    {
654        uint32_t j;
655        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
656
657        for( j=0; j<poolSize; ++j )
658        {
659            int k;
660            const int piece = pool[j];
661            struct tr_refill_piece * setme = p + j;
662
663            setme->piece = piece;
664            setme->priority = inf->pieces[piece].priority;
665            setme->peerCount = 0;
666            setme->random = tr_rand( UINT16_MAX );
667            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
668
669            for( k=0; k<peerCount; ++k ) {
670                const tr_peer * peer = peers[k];
671                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
672                    ++setme->peerCount;
673            }
674        }
675
676        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
677
678        for( j=0; j<poolSize; ++j )
679            pool[j] = p[j].piece;
680
681        tr_free( p );
682    }
683
684    tr_free( peers );
685
686    *pieceCount = poolSize;
687    return pool;
688}
689
690static uint64_t*
691getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
692{
693    int s;
694    uint32_t i;
695    uint32_t pieceCount;
696    uint32_t blockCount;
697    uint32_t unreqCount[3], reqCount[3];
698    uint32_t * pieces;
699    uint64_t * ret, * walk;
700    uint64_t * unreq[3], *req[3];
701    const tr_torrent * tor = t->tor;
702
703    assert( torrentIsLocked( t ) );
704
705    pieces = getPreferredPieces( t, &pieceCount );
706
707    /**
708     * Now we walk through those preferred pieces to find all the blocks
709     * are still missing from them.  We put unrequested blocks first,
710     * of course, but by including requested blocks afterwards, endgame
711     * handling happens naturally.
712     *
713     * By doing this once per priority we also effectively get an endgame
714     * mode for each priority level.  The helps keep high priority files
715     * from getting stuck at 99% due of unresponsive peers.
716     */
717
718    /* make temporary bins for the four tiers of blocks */
719    for( i=0; i<3; ++i ) {
720        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
721        reqCount[i] = 0;
722        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
723        unreqCount[i] = 0;
724    }
725
726    /* sort the blocks into our temp bins */
727    for( i=blockCount=0; i<pieceCount; ++i )
728    {
729        const tr_piece_index_t index = pieces[i];
730        const int priorityIndex = tor->info.pieces[index].priority + 1;
731        const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
732        const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
733        tr_block_index_t block;
734
735        for( block=begin; block<end; ++block )
736        {
737            if( tr_cpBlockIsComplete( tor->completion, block ) )
738                continue;
739
740            ++blockCount;
741
742            if( tr_bitfieldHas( t->requested, block ) )
743            {
744                const uint32_t n = reqCount[priorityIndex]++;
745                req[priorityIndex][n] = block;
746            }
747            else
748            {
749                const uint32_t n = unreqCount[priorityIndex]++;
750                unreq[priorityIndex][n] = block;
751            }
752        }
753    }
754
755    /* join the bins together, going from highest priority to lowest so
756     * the the blocks we want to request first will be first in the list */
757    ret = walk = tr_new( uint64_t, blockCount );
758    for( s=2; s>=0; --s ) {
759        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
760        walk += unreqCount[s];
761        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
762        walk += reqCount[s];
763    }
764    assert( ( walk - ret ) == ( int )blockCount );
765    *setmeCount = blockCount;
766
767    /* cleanup */
768    tr_free( pieces );
769    for( i=0; i<3; ++i ) {
770        tr_free( unreq[i] );
771        tr_free( req[i] );
772    }
773    return ret;
774}
775
776static tr_peer**
777getPeersUploadingToClient( Torrent * t, int * setmeCount )
778{
779    int i;
780    int peerCount = 0;
781    int retCount = 0;
782    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
783    tr_peer ** ret = tr_new( tr_peer*, peerCount );
784
785    /* get a list of peers we're downloading from */
786    for( i=0; i<peerCount; ++i )
787        if( clientIsDownloadingFrom( peers[i] ) )
788            ret[retCount++] = peers[i];
789
790    /* pick a different starting point each time so all peers
791     * get a chance at the first blocks in the queue */
792    if( retCount ) {
793        tr_peer ** tmp = tr_new( tr_peer*, retCount );
794        i = tr_rand( retCount );
795        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
796        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
797        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
798        tr_free( tmp );
799    }
800
801    *setmeCount = retCount;
802    return ret;
803}
804
805static int
806refillPulse( void * vtorrent )
807{
808    Torrent * t = vtorrent;
809    tr_torrent * tor = t->tor;
810    tr_block_index_t i;
811    int peerCount;
812    tr_peer ** peers;
813    tr_block_index_t blockCount;
814    uint64_t * blocks;
815
816    if( !t->isRunning )
817        return TRUE;
818    if( tr_torrentIsSeed( t->tor ) )
819        return TRUE;
820
821    torrentLock( t );
822    tordbg( t, "Refilling Request Buffers..." );
823
824    blocks = getPreferredBlocks( t, &blockCount );
825    peers = getPeersUploadingToClient( t, &peerCount );
826
827    for( i=0; peerCount && i<blockCount; ++i )
828    {
829        int j;
830
831        const tr_block_index_t block = blocks[i];
832        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
833        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
834        const uint32_t length = tr_torBlockCountBytes( tor, block );
835
836        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
837        assert( _tr_block( tor, index, begin ) == block );
838        assert( begin < tr_torPieceCountBytes( tor, index ) );
839        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
840
841        /* find a peer who can ask for this block */
842        for( j=0; j<peerCount; )
843        {
844            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
845            switch( val )
846            {
847                case TR_ADDREQ_FULL: 
848                case TR_ADDREQ_CLIENT_CHOKED:
849                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
850                    break;
851
852                case TR_ADDREQ_MISSING: 
853                case TR_ADDREQ_DUPLICATE: 
854                    ++j;
855                    break;
856
857                case TR_ADDREQ_OK:
858                    tr_bitfieldAdd( t->requested, block );
859                    j = peerCount;
860                    break;
861
862                default:
863                    assert( 0 && "unhandled value" );
864                    break;
865            }
866        }
867    }
868
869    /* cleanup */
870    tr_free( peers );
871    tr_free( blocks );
872
873    t->refillTimer = NULL;
874    torrentUnlock( t );
875    return FALSE;
876}
877
878static void
879broadcastClientHave( Torrent * t, uint32_t index )
880{
881    int i, size;
882    tr_peer ** peers;
883
884    assert( torrentIsLocked( t ) );
885
886    peers = getConnectedPeers( t, &size );
887    for( i=0; i<size; ++i )
888        tr_peerMsgsHave( peers[i]->msgs, index );
889    tr_free( peers );
890}
891
892static void
893broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
894{
895    int i, size;
896    tr_peer ** peers;
897
898    assert( torrentIsLocked( t ) );
899
900    peers = getConnectedPeers( t, &size );
901    for( i=0; i<size; ++i )
902        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
903    tr_free( peers );
904}
905
906static void
907addStrike( Torrent * t, tr_peer * peer )
908{
909    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
910
911    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
912    {
913        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
914        atom->myflags |= MYFLAG_BANNED;
915        peer->doPurge = 1;
916        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
917    }
918}
919
920static void
921msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
922{
923    tr_peer * peer = vpeer;
924    Torrent * t = (Torrent *) vt;
925    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
926
927    torrentLock( t );
928
929    switch( e->eventType )
930    {
931        case TR_PEERMSG_NEED_REQ:
932            if( t->refillTimer == NULL )
933                t->refillTimer = tr_timerNew( t->manager->handle,
934                                              refillPulse, t,
935                                              REFILL_PERIOD_MSEC );
936            break;
937
938        case TR_PEERMSG_CANCEL:
939            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
940            break;
941
942        case TR_PEERMSG_PIECE_DATA: {
943            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
944            atom->piece_data_time = time( NULL );
945            break;
946        }
947
948        case TR_PEERMSG_CLIENT_HAVE:
949            broadcastClientHave( t, e->pieceIndex );
950            tr_torrentRecheckCompleteness( t->tor );
951            break;
952
953        case TR_PEERMSG_PEER_PROGRESS: {
954            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
955            const int peerIsSeed = e->progress >= 1.0;
956            if( peerIsSeed ) {
957                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
958                atom->flags |= ADDED_F_SEED_FLAG;
959            } else {
960                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
961                atom->flags &= ~ADDED_F_SEED_FLAG;
962            } break;
963        }
964
965        case TR_PEERMSG_CLIENT_BLOCK:
966            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
967            break;
968
969        case TR_PEERMSG_ERROR:
970            if( TR_ERROR_IS_IO( e->err ) ) {
971                t->tor->error = e->err;
972                strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
973                tr_torrentStop( t->tor );
974            } else if( e->err == TR_ERROR_ASSERT ) {
975                addStrike( t, peer );
976            }
977            peer->doPurge = 1;
978            break;
979
980        default:
981            assert(0);
982    }
983
984    torrentUnlock( t );
985}
986
987static void
988ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
989{
990    if( getExistingAtom( t, addr ) == NULL )
991    {
992        struct peer_atom * a;
993        a = tr_new0( struct peer_atom, 1 );
994        a->addr = *addr;
995        a->port = port;
996        a->flags = flags;
997        a->from = from;
998        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
999        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1000    }
1001}
1002
1003static void
1004maybeEnsureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
1005{
1006    if( tr_blocklistHasAddress( t->manager->handle, addr ) )
1007    {
1008        char * fmt = NULL;
1009        switch( from ) {
1010            case TR_PEER_FROM_TRACKER:  fmt = _( "Banned IP address \"%s\" was given to us by the tracker" ); break;
1011            case TR_PEER_FROM_CACHE:    fmt = _( "Banned IP address \"%s\" was found in the cache" ); break;
1012            case TR_PEER_FROM_PEX:      fmt = _( "Banned IP address \"%s\" was given to us by another peer" ); break;
1013            case TR_PEER_FROM_INCOMING: fmt = _( "Banned IP address \"%s\" tried to connect to us" ); break;
1014        }
1015        tr_torinf( t->tor, fmt, inet_ntoa( *addr ) );
1016    }
1017    else
1018    {
1019        ensureAtomExists( t, addr, port, flags, from );
1020    }
1021}
1022
1023static int
1024getMaxPeerCount( const tr_torrent * tor UNUSED )
1025{
1026    return tor->maxConnectedPeers;
1027}
1028
1029/* FIXME: this is kind of a mess. */
1030static void
1031myHandshakeDoneCB( tr_handshake    * handshake,
1032                   tr_peerIo       * io,
1033                   int               isConnected,
1034                   const uint8_t   * peer_id,
1035                   void            * vmanager )
1036{
1037    int ok = isConnected;
1038    uint16_t port;
1039    const struct in_addr * addr;
1040    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
1041    Torrent * t;
1042    tr_handshake * ours;
1043
1044    assert( io != NULL );
1045    assert( isConnected==0 || isConnected==1 );
1046
1047    t = tr_peerIoHasTorrentHash( io )
1048        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1049        : NULL;
1050
1051    if( tr_peerIoIsIncoming ( io ) )
1052        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1053                                        handshake, handshakeCompare );
1054    else if( t != NULL )
1055        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1056                                        handshake, handshakeCompare );
1057    else
1058        ours = handshake;
1059
1060    assert( ours != NULL );
1061    assert( ours == handshake );
1062
1063    if( t != NULL )
1064        torrentLock( t );
1065
1066    addr = tr_peerIoGetAddress( io, &port );
1067
1068    if( !ok || !t || !t->isRunning )
1069    {
1070        if( t ) {
1071            struct peer_atom * atom = getExistingAtom( t, addr );
1072            if( atom )
1073                ++atom->numFails;
1074        }
1075
1076        tr_peerIoFree( io );
1077    }
1078    else /* looking good */
1079    {
1080        struct peer_atom * atom;
1081        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1082        atom = getExistingAtom( t, addr );
1083
1084        if( atom->myflags & MYFLAG_BANNED )
1085        {
1086            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1087            tr_peerIoFree( io );
1088        }
1089        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1090        {
1091            tr_peerIoFree( io );
1092        }
1093        else
1094        {
1095            tr_peer * peer = getExistingPeer( t, addr );
1096
1097            if( peer != NULL ) /* we already have this peer */
1098            {
1099                tr_peerIoFree( io );
1100            }
1101            else
1102            {
1103                peer = getPeer( t, addr );
1104                tr_free( peer->client );
1105                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
1106                peer->port = port;
1107                peer->io = io;
1108                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1109                atom->time = time( NULL );
1110            }
1111        }
1112    }
1113
1114    if( t != NULL )
1115        torrentUnlock( t );
1116}
1117
1118void
1119tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1120                       struct in_addr  * addr,
1121                       uint16_t          port,
1122                       int               socket )
1123{
1124    managerLock( manager );
1125
1126    if( tr_blocklistHasAddress( manager->handle, addr ) )
1127    {
1128        tr_inf( _( "Banned IP address \"%s\" tried to connect to us" ),
1129                inet_ntoa( *addr ) );
1130        tr_netClose( socket );
1131    }
1132    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1133    {
1134        tr_netClose( socket );
1135    }
1136    else /* we don't have a connetion to them yet... */
1137    {
1138        tr_peerIo * io;
1139        tr_handshake * handshake;
1140
1141        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1142
1143        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1144
1145        handshake = tr_handshakeNew( io,
1146                                     manager->handle->encryptionMode,
1147                                     myHandshakeDoneCB,
1148                                     manager );
1149
1150        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1151    }
1152
1153    managerUnlock( manager );
1154}
1155
1156void
1157tr_peerMgrAddPex( tr_peerMgr     * manager,
1158                  const uint8_t  * torrentHash,
1159                  uint8_t          from,
1160                  const tr_pex   * pex )
1161{
1162    Torrent * t;
1163    managerLock( manager );
1164
1165    t = getExistingTorrent( manager, torrentHash );
1166    maybeEnsureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1167
1168    managerUnlock( manager );
1169}
1170
1171void
1172tr_peerMgrAddPeers( tr_peerMgr    * manager,
1173                    const uint8_t * torrentHash,
1174                    uint8_t         from,
1175                    const uint8_t * peerCompact,
1176                    int             peerCount )
1177{
1178    int i;
1179    const uint8_t * walk = peerCompact;
1180    Torrent * t;
1181
1182    managerLock( manager );
1183
1184    t = getExistingTorrent( manager, torrentHash );
1185    for( i=0; t!=NULL && i<peerCount; ++i )
1186    {
1187        struct in_addr addr;
1188        uint16_t port;
1189        memcpy( &addr, walk, 4 ); walk += 4;
1190        memcpy( &port, walk, 2 ); walk += 2;
1191        maybeEnsureAtomExists( t, &addr, port, 0, from );
1192    }
1193
1194    managerUnlock( manager );
1195}
1196
1197/**
1198***
1199**/
1200
1201void
1202tr_peerMgrSetBlame( tr_peerMgr     * manager,
1203                    const uint8_t  * torrentHash,
1204                    int              pieceIndex,
1205                    int              success )
1206{
1207    if( !success )
1208    {
1209        int peerCount, i;
1210        Torrent * t = getExistingTorrent( manager, torrentHash );
1211        tr_peer ** peers;
1212
1213        assert( torrentIsLocked( t ) );
1214
1215        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1216        for( i=0; i<peerCount; ++i )
1217        {
1218            tr_peer * peer = peers[i];
1219            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1220            {
1221                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1222                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1223                           pieceIndex, (int)peer->strikes+1 );
1224                addStrike( t, peer );
1225            }
1226        }
1227    }
1228}
1229
1230int
1231tr_pexCompare( const void * va, const void * vb )
1232{
1233    const tr_pex * a = (const tr_pex *) va;
1234    const tr_pex * b = (const tr_pex *) vb;
1235    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1236    if( i ) return i;
1237    if( a->port < b->port ) return -1;
1238    if( a->port > b->port ) return 1;
1239    return 0;
1240}
1241
1242int tr_pexCompare( const void * a, const void * b );
1243
1244static int
1245peerPrefersCrypto( const tr_peer * peer )
1246{
1247    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1248        return TRUE;
1249
1250    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1251        return FALSE;
1252
1253    return tr_peerIoIsEncrypted( peer->io );
1254};
1255
1256int
1257tr_peerMgrGetPeers( tr_peerMgr      * manager,
1258                    const uint8_t   * torrentHash,
1259                    tr_pex         ** setme_pex )
1260{
1261    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1262    int i, peerCount;
1263    const tr_peer ** peers;
1264    tr_pex * pex;
1265    tr_pex * walk;
1266
1267    managerLock( manager );
1268
1269    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1270    pex = walk = tr_new( tr_pex, peerCount );
1271
1272    for( i=0; i<peerCount; ++i, ++walk )
1273    {
1274        const tr_peer * peer = peers[i];
1275
1276        walk->in_addr = peer->in_addr;
1277
1278        walk->port = peer->port;
1279
1280        walk->flags = 0;
1281        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1282        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1283    }
1284
1285    assert( ( walk - pex ) == peerCount );
1286    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1287    *setme_pex = pex;
1288
1289    managerUnlock( manager );
1290
1291    return peerCount;
1292}
1293
1294static int reconnectPulse( void * vtorrent );
1295static int rechokePulse( void * vtorrent );
1296static int swiftPulse( void * vtorrent );
1297
1298void
1299tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1300                        const uint8_t  * torrentHash )
1301{
1302    Torrent * t;
1303
1304    managerLock( manager );
1305
1306    t = getExistingTorrent( manager, torrentHash );
1307
1308    assert( t != NULL );
1309    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1310    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1311    assert( ( t->isRunning != 0 ) == ( t->swiftTimer != NULL ) );
1312
1313    if( !t->isRunning )
1314    {
1315        t->isRunning = 1;
1316
1317        t->reconnectTimer = tr_timerNew( t->manager->handle,
1318                                         reconnectPulse, t,
1319                                         RECONNECT_PERIOD_MSEC );
1320
1321        t->rechokeTimer = tr_timerNew( t->manager->handle,
1322                                       rechokePulse, t,
1323                                       RECHOKE_PERIOD_MSEC );
1324
1325        t->swiftTimer = tr_timerNew( t->manager->handle,
1326                                     swiftPulse, t,
1327                                     SWIFT_PERIOD_MSEC );
1328
1329        reconnectPulse( t );
1330
1331        rechokePulse( t );
1332
1333        swiftPulse( t );
1334    }
1335
1336    managerUnlock( manager );
1337}
1338
1339static void
1340stopTorrent( Torrent * t )
1341{
1342    assert( torrentIsLocked( t ) );
1343
1344    t->isRunning = 0;
1345    tr_timerFree( &t->rechokeTimer );
1346    tr_timerFree( &t->reconnectTimer );
1347    tr_timerFree( &t->swiftTimer );
1348
1349    /* disconnect the peers. */
1350    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1351    tr_ptrArrayClear( t->peers );
1352
1353    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1354     * which removes the handshake from t->outgoingHandshakes... */
1355    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1356        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1357}
1358void
1359tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1360                       const uint8_t  * torrentHash)
1361{
1362    managerLock( manager );
1363
1364    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1365
1366    managerUnlock( manager );
1367}
1368
1369void
1370tr_peerMgrAddTorrent( tr_peerMgr * manager,
1371                      tr_torrent * tor )
1372{
1373    Torrent * t;
1374
1375    managerLock( manager );
1376
1377    assert( tor != NULL );
1378    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1379
1380    t = torrentConstructor( manager, tor );
1381    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1382
1383    managerUnlock( manager );
1384}
1385
1386void
1387tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1388                         const uint8_t  * torrentHash )
1389{
1390    Torrent * t;
1391
1392    managerLock( manager );
1393
1394    t = getExistingTorrent( manager, torrentHash );
1395    assert( t != NULL );
1396    stopTorrent( t );
1397    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1398    torrentDestructor( t );
1399
1400    managerUnlock( manager );
1401}
1402
1403void
1404tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1405                               const uint8_t    * torrentHash,
1406                               int8_t           * tab,
1407                               int                tabCount )
1408{
1409    int i;
1410    const Torrent * t;
1411    const tr_torrent * tor;
1412    float interval;
1413
1414    managerLock( (tr_peerMgr*)manager );
1415
1416    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1417    tor = t->tor;
1418    interval = tor->info.pieceCount / (float)tabCount;
1419
1420    memset( tab, 0, tabCount );
1421
1422    for( i=0; i<tabCount; ++i )
1423    {
1424        const int piece = i * interval;
1425
1426        if( tor == NULL )
1427            tab[i] = 0;
1428        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1429            tab[i] = -1;
1430        else {
1431            int j, peerCount;
1432            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1433            for( j=0; j<peerCount; ++j )
1434                if( tr_bitfieldHas( peers[j]->have, i ) )
1435                    ++tab[i];
1436        }
1437    }
1438
1439    managerUnlock( (tr_peerMgr*)manager );
1440}
1441
1442/* Returns the pieces that are available from peers */
1443tr_bitfield*
1444tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1445                        const uint8_t    * torrentHash )
1446{
1447    int i, size;
1448    Torrent * t;
1449    tr_peer ** peers;
1450    tr_bitfield * pieces;
1451    managerLock( (tr_peerMgr*)manager );
1452
1453    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1454    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1455    peers = getConnectedPeers( t, &size );
1456    for( i=0; i<size; ++i )
1457        tr_bitfieldOr( pieces, peers[i]->have );
1458
1459    managerUnlock( (tr_peerMgr*)manager );
1460    tr_free( peers );
1461    return pieces;
1462}
1463
1464int
1465tr_peerMgrHasConnections( const tr_peerMgr * manager,
1466                          const uint8_t    * torrentHash )
1467{
1468    int ret;
1469    const Torrent * t;
1470    managerLock( (tr_peerMgr*)manager );
1471
1472    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1473    ret = t && tr_ptrArraySize( t->peers );
1474
1475    managerUnlock( (tr_peerMgr*)manager );
1476    return ret;
1477}
1478
1479void
1480tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1481                        const uint8_t    * torrentHash,
1482                        int              * setmePeersKnown,
1483                        int              * setmePeersConnected,
1484                        int              * setmePeersSendingToUs,
1485                        int              * setmePeersGettingFromUs,
1486                        int              * setmePeersFrom )
1487{
1488    int i, size;
1489    const Torrent * t;
1490    const tr_peer ** peers;
1491
1492    managerLock( (tr_peerMgr*)manager );
1493
1494    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1495    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1496
1497    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1498    *setmePeersConnected      = 0;
1499    *setmePeersSendingToUs    = 0;
1500    *setmePeersGettingFromUs  = 0;
1501
1502    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1503        setmePeersFrom[i] = 0;
1504
1505    for( i=0; i<size; ++i )
1506    {
1507        const tr_peer * peer = peers[i];
1508        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1509
1510        if( peer->io == NULL ) /* not connected */
1511            continue;
1512
1513        ++*setmePeersConnected;
1514
1515        ++setmePeersFrom[atom->from];
1516
1517        if( clientIsDownloadingFrom( peer ) )
1518            ++*setmePeersSendingToUs;
1519
1520        if( clientIsUploadingTo( peer ) )
1521            ++*setmePeersGettingFromUs;
1522    }
1523
1524    managerUnlock( (tr_peerMgr*)manager );
1525}
1526
1527struct tr_peer_stat *
1528tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1529                     const uint8_t     * torrentHash,
1530                     int               * setmeCount UNUSED )
1531{
1532    int i, size;
1533    const Torrent * t;
1534    tr_peer ** peers;
1535    tr_peer_stat * ret;
1536
1537    assert( manager != NULL );
1538    managerLock( (tr_peerMgr*)manager );
1539
1540    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1541    peers = getConnectedPeers( (Torrent*)t, &size );
1542    ret = tr_new0( tr_peer_stat, size );
1543
1544    for( i=0; i<size; ++i )
1545    {
1546        char * pch;
1547        const tr_peer * peer = peers[i];
1548        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1549        tr_peer_stat * stat = ret + i;
1550
1551        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1552        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1553        stat->port               = peer->port;
1554        stat->from               = atom->from;
1555        stat->progress           = peer->progress;
1556        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1557        stat->uploadToRate       = peer->rateToPeer;
1558        stat->downloadFromRate   = peer->rateToClient;
1559        stat->peerIsChoked       = peer->peerIsChoked;
1560        stat->peerIsInterested   = peer->peerIsInterested;
1561        stat->clientIsChoked     = peer->clientIsChoked;
1562        stat->clientIsInterested = peer->clientIsInterested;
1563        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1564        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1565        stat->isUploadingTo      = clientIsUploadingTo( peer );
1566
1567        pch = stat->flagStr;
1568        if( t->optimistic == peer ) *pch++ = 'O';
1569        if( stat->isDownloadingFrom ) *pch++ = 'D';
1570        else if( stat->clientIsInterested ) *pch++ = 'd';
1571        if( stat->isUploadingTo ) *pch++ = 'U';
1572        else if( stat->peerIsInterested ) *pch++ = 'u';
1573        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1574        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1575        if( stat->isEncrypted ) *pch++ = 'E';
1576        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1577        if( stat->isIncoming ) *pch++ = 'I';
1578        *pch = '\0';
1579    }
1580
1581    *setmeCount = size;
1582    tr_free( peers );
1583
1584    managerUnlock( (tr_peerMgr*)manager );
1585    return ret;
1586}
1587
1588/**
1589***
1590**/
1591
1592struct ChokeData
1593{
1594    uint8_t doUnchoke;
1595    uint8_t isInterested;
1596    uint32_t rate;
1597    tr_peer * peer;
1598};
1599
1600static int
1601compareChoke( const void * va, const void * vb )
1602{
1603    const struct ChokeData * a = va;
1604    const struct ChokeData * b = vb;
1605    return -tr_compareUint32( a->rate, b->rate );
1606}
1607
1608static int
1609isNew( const tr_peer * peer )
1610{
1611    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1612}
1613
1614static int
1615isSame( const tr_peer * peer )
1616{
1617    return peer && peer->client && strstr( peer->client, "Transmission" );
1618}
1619
1620/**
1621***
1622**/
1623
1624static double
1625getWeightedRate( const tr_peer * peer, int clientIsSeed )
1626{
1627    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1628                                        : peer->rateToClient ) );
1629}
1630
1631static void
1632rechoke( Torrent * t )
1633{
1634    int i, n, peerCount, size, unchokedInterested;
1635    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1636    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1637    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1638
1639    assert( torrentIsLocked( t ) );
1640   
1641    /* sort the peers by preference and rate */
1642    for( i=0, size=0; i<peerCount; ++i )
1643    {
1644        tr_peer * peer = peers[i];
1645        if( peer->progress >= 1.0 ) /* choke all seeds */
1646            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1647        else {
1648            struct ChokeData * node = &choke[size++];
1649            node->peer = peer;
1650            node->isInterested = peer->peerIsInterested;
1651            node->rate = getWeightedRate( peer, clientIsSeed );
1652        }
1653    }
1654
1655    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1656
1657    /**
1658     * Reciprocation and number of uploads capping is managed by unchoking
1659     * the N peers which have the best upload rate and are interested.
1660     * This maximizes the client's download rate. These N peers are
1661     * referred to as downloaders, because they are interested in downloading
1662     * from the client.
1663     *
1664     * Peers which have a better upload rate (as compared to the downloaders)
1665     * but aren't interested get unchoked. If they become interested, the
1666     * downloader with the worst upload rate gets choked. If a client has
1667     * a complete file, it uses its upload rate rather than its download
1668     * rate to decide which peers to unchoke.
1669     */
1670    unchokedInterested = 0;
1671    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1672        choke[i].doUnchoke = 1;
1673        if( choke[i].isInterested )
1674            ++unchokedInterested;
1675    }
1676    n = i;
1677    while( i<size )
1678        choke[i++].doUnchoke = 0;
1679
1680    /* optimistic unchoke */
1681    if( i < size )
1682    {
1683        struct ChokeData * c;
1684        tr_ptrArray * randPool = tr_ptrArrayNew( );
1685        for( ; i<size; ++i )
1686        {
1687            const tr_peer * peer = choke[i].peer;
1688            int x=1, y;
1689            if( isNew( peer ) ) x *= 3;
1690            if( isSame( peer ) ) x *= 3;
1691            for( y=0; y<x; ++y )
1692                tr_ptrArrayAppend( randPool, choke );
1693        }
1694        i = tr_rand( tr_ptrArraySize( randPool ) );
1695        c = ( struct ChokeData* )tr_ptrArrayNth( randPool, i);
1696        c->doUnchoke = 1;
1697        t->optimistic = c->peer;
1698        tr_ptrArrayFree( randPool, NULL );
1699    }
1700
1701    for( i=0; i<size; ++i )
1702        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1703
1704    /* cleanup */
1705    tr_free( choke );
1706    tr_free( peers );
1707}
1708
1709static int
1710rechokePulse( void * vtorrent )
1711{
1712    Torrent * t = vtorrent;
1713    torrentLock( t );
1714    rechoke( t );
1715    torrentUnlock( t );
1716    return TRUE;
1717}
1718
1719/***
1720****
1721***/
1722
1723static int
1724swiftPulse( void * vtorrent )
1725{
1726    Torrent * t = vtorrent;
1727    torrentLock( t );
1728
1729    if( !tr_torrentIsSeed( t->tor ) )
1730    {
1731        int i;
1732        int peerCount = 0;
1733        int deadbeatCount = 0;
1734        tr_peer ** peers = getConnectedPeers( t, &peerCount );
1735        tr_peer ** deadbeats = tr_new( tr_peer*, peerCount );
1736
1737        const double ul_KiBsec = tr_rcRate( t->tor->upload );
1738        const double ul_KiB = ul_KiBsec * (SWIFT_PERIOD_MSEC/1000.0);
1739        const double ul_bytes = ul_KiB * 1024;
1740        const double freeCreditTotal = ul_bytes * SWIFT_LARGESSE;
1741        int freeCreditPerPeer;
1742
1743        for( i=0; i<peerCount; ++i ) {
1744            tr_peer * peer = peers[i];
1745            if( peer->credit <= 0 )
1746                deadbeats[deadbeatCount++] =  peer;
1747        }
1748
1749        freeCreditPerPeer = (int)( freeCreditTotal / deadbeatCount );
1750        for( i=0; i<deadbeatCount; ++i )
1751            deadbeats[i]->credit = freeCreditPerPeer;
1752
1753        tordbg( t, "%d deadbeats, "
1754            "who are each being granted %d bytes' credit "
1755            "for a total of %.1f KiB, "
1756            "%d%% of the torrent's ul speed %.1f\n",
1757            deadbeatCount, freeCreditPerPeer,
1758            ul_KiBsec*SWIFT_LARGESSE, (int)(SWIFT_LARGESSE*100), ul_KiBsec );
1759
1760        tr_free( deadbeats );
1761        tr_free( peers );
1762    }
1763
1764    torrentUnlock( t );
1765    return TRUE;
1766}
1767
1768/***
1769****
1770****  Life and Death
1771****
1772***/
1773
1774static int
1775shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1776{
1777    const tr_torrent * tor = t->tor;
1778    const time_t now = time( NULL );
1779    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1780
1781    /* if it's marked for purging, close it */
1782    if( peer->doPurge ) {
1783        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1784        return TRUE;
1785    }
1786
1787    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1788    if( 1 ) {
1789        const int clientIsSeed = tr_torrentIsSeed( tor );
1790        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1791        if( peerIsSeed && clientIsSeed && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1792            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1793            return TRUE;
1794        }
1795    }
1796
1797    /* disconnect if it's been too long since piece data has been transferred.
1798     * this is on a sliding scale based on number of available peers... */
1799    if( 1 ) {
1800        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1801        /* if we have >= relaxIfFewerThan, strictness is 100%.
1802         * if we have zero connections, strictness is 0% */
1803        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1804            ? 1.0
1805            : peerCount / (double)relaxStrictnessIfFewerThanN;
1806        const int lo = MIN_UPLOAD_IDLE_SECS;
1807        const int hi = MAX_UPLOAD_IDLE_SECS;
1808        const int limit = lo + ((hi-lo) * strictness);
1809        const time_t then = peer->pieceDataActivityDate;
1810        const int idleTime = then ? (now-then) : 0;
1811        if( idleTime > limit ) {
1812            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1813                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1814            return TRUE;
1815        }
1816    }
1817
1818    return FALSE;
1819}
1820
1821static tr_peer **
1822getPeersToClose( Torrent * t, int * setmeSize )
1823{
1824    int i, peerCount, outsize;
1825    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1826    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1827
1828    assert( torrentIsLocked( t ) );
1829
1830    for( i=outsize=0; i<peerCount; ++i )
1831        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1832            ret[outsize++] = peers[i];
1833
1834    *setmeSize = outsize;
1835    return ret;
1836}
1837
1838static int
1839compareCandidates( const void * va, const void * vb )
1840{
1841    const struct peer_atom * a = * (const struct peer_atom**) va;
1842    const struct peer_atom * b = * (const struct peer_atom**) vb;
1843    int i;
1844
1845    if( a->piece_data_time > b->piece_data_time ) return -1;
1846    if( a->piece_data_time < b->piece_data_time ) return  1;
1847
1848    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1849        return i;
1850
1851    if( a->time != b->time )
1852        return a->time < b->time ? -1 : 1;
1853
1854    return 0;
1855}
1856
1857static struct peer_atom **
1858getPeerCandidates( Torrent * t, int * setmeSize )
1859{
1860    int i, atomCount, retCount;
1861    struct peer_atom ** atoms;
1862    struct peer_atom ** ret;
1863    const time_t now = time( NULL );
1864    const int seed = tr_torrentIsSeed( t->tor );
1865
1866    assert( torrentIsLocked( t ) );
1867
1868    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1869    ret = tr_new( struct peer_atom*, atomCount );
1870    for( i=retCount=0; i<atomCount; ++i )
1871    {
1872        struct peer_atom * atom = atoms[i];
1873
1874        /* peer fed us too much bad data ... we only keep it around
1875         * now to weed it out in case someone sends it to us via pex */
1876        if( atom->myflags & MYFLAG_BANNED )
1877            continue;
1878
1879        /* peer was unconnectable before, so we're not going to keep trying.
1880         * this is needs a separate flag from `banned', since if they try
1881         * to connect to us later, we'll let them in */
1882        if( atom->myflags & MYFLAG_UNREACHABLE )
1883            continue;
1884
1885        /* we don't need two connections to the same peer... */
1886        if( peerIsInUse( t, &atom->addr ) )
1887            continue;
1888
1889        /* no need to connect if we're both seeds... */
1890        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1891            continue;
1892
1893        /* we're wasting our time trying to connect to this bozo. */
1894        if( atom->numFails > 3 )
1895            continue;
1896
1897        /* If we were connected to this peer recently and transferring
1898         * piece data, try to reconnect -- network troubles may have
1899         * disconnected us.  but if we weren't sharing piece data,
1900         * hold off on this peer to give another one a try instead */
1901        if( ( now - atom->piece_data_time ) > 30 )
1902        {
1903            int minWait = (60 * 10); /* ten minutes */
1904            int maxWait = (60 * 30); /* thirty minutes */
1905            int wait = atom->numFails * minWait;
1906            if( wait < minWait ) wait = minWait;
1907            if( wait > maxWait ) wait = maxWait;
1908            if( ( now - atom->time ) < wait ) {
1909                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1910                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1911                continue;
1912            }
1913        }
1914
1915        /* Don't connect to peers in our blocklist */
1916        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1917            continue;
1918
1919        ret[retCount++] = atom;
1920    }
1921
1922    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1923    *setmeSize = retCount;
1924    return ret;
1925}
1926
1927static int
1928reconnectPulse( void * vtorrent )
1929{
1930    Torrent * t = vtorrent;
1931    static time_t prevTime = 0;
1932    static int newConnectionsThisSecond = 0;
1933    time_t now;
1934
1935    torrentLock( t );
1936
1937    now = time( NULL );
1938    if( prevTime != now )
1939    {
1940        prevTime = now;
1941        newConnectionsThisSecond = 0;
1942    }
1943
1944    if( !t->isRunning )
1945    {
1946        removeAllPeers( t );
1947    }
1948    else
1949    {
1950        int i, nCandidates, nBad;
1951        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1952        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1953
1954        if( nBad || nCandidates )
1955            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1956                       "%d connection candidates, %d atoms, max per pulse is %d",
1957                       t->tor->info.name, nBad, nCandidates,
1958                       tr_ptrArraySize(t->pool),
1959                       (int)MAX_RECONNECTIONS_PER_PULSE );
1960
1961        /* disconnect some peers.
1962           if we got transferred piece data, then they might be good peers,
1963           so reset their `numFails' weight to zero.  otherwise we connected
1964           to them fruitlessly, so mark it as another fail */
1965        for( i=0; i<nBad; ++i ) {
1966            tr_peer * peer = connections[i];
1967            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1968            if( peer->pieceDataActivityDate )
1969                atom->numFails = 0;
1970            else
1971                ++atom->numFails;
1972            removePeer( t, peer );
1973        }
1974
1975        /* add some new ones */
1976        for( i=0;    i < nCandidates
1977                  && i < MAX_RECONNECTIONS_PER_PULSE
1978                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1979        {
1980            tr_peerMgr * mgr = t->manager;
1981            struct peer_atom * atom = candidates[i];
1982            tr_peerIo * io;
1983
1984            tordbg( t, "Starting an OUTGOING connection with %s",
1985                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1986
1987            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1988            if( io == NULL )
1989            {
1990                atom->myflags |= MYFLAG_UNREACHABLE;
1991            }
1992            else
1993            {
1994                tr_handshake * handshake = tr_handshakeNew( io,
1995                                                            mgr->handle->encryptionMode,
1996                                                            myHandshakeDoneCB,
1997                                                            mgr );
1998
1999                assert( tr_peerIoGetTorrentHash( io ) != NULL );
2000
2001                ++newConnectionsThisSecond;
2002
2003                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
2004            }
2005
2006            atom->time = time( NULL );
2007        }
2008
2009        /* cleanup */
2010        tr_free( connections );
2011        tr_free( candidates );
2012    }
2013
2014    torrentUnlock( t );
2015    return TRUE;
2016}
Note: See TracBrowser for help on using the repository browser.