source: trunk/libtransmission/peer-mgr.c @ 5650

Last change on this file since 5650 was 5650, checked in by charles, 14 years ago

peer-msgs: faster upload speeds in situations with few peers. this patch needs wider testing for side-effects wrt speed limits.

  • Property svn:keywords set to Date Rev Author Id
File size: 56.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 5650 2008-04-19 19:37:05Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "blocklist.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "ratecontrol.h"
36#include "torrent.h"
37#include "trcompat.h" /* strlcpy */
38#include "trevent.h"
39#include "utils.h"
40
41/**
42*** The "SWIFT" system is described by Karthik Tamilmani,
43*** Vinay Pai, and Alexander Mohr of Stony Brook University
44*** in their paper "SWIFT: A System With Incentives For Trading"
45*** http://citeseer.ist.psu.edu/tamilmani04swift.html
46***
47*** More SWIFT constants are defined in peer-mgr-private.h
48**/
49
50/**
51 * Allow new peers to download this many bytes from
52 * us when getting started.  This can prevent gridlock
53 * with other peers using tit-for-tat algorithms
54 */
55static const int SWIFT_INITIAL_CREDIT = 64 * 1024; /* 64 KiB */
56
57/**
58 * We expend a fraction of our torrent's total upload speed
59 * on largesse by uniformly distributing free credit to
60 * all of our peers.  This too helps prevent gridlock.
61 */
62static const double SWIFT_LARGESSE = 0.15; /* 15% of our UL */
63
64/**
65 * How frequently to extend largesse-based credit
66 */
67static const int SWIFT_PERIOD_MSEC = 5000;
68
69
70enum
71{
72    /* how frequently to change which peers are choked */
73    RECHOKE_PERIOD_MSEC = (10 * 1000),
74
75    /* how frequently to refill peers' request lists */
76    REFILL_PERIOD_MSEC = 666,
77
78    /* following the BT spec, we consider ourselves `snubbed' if
79     * we're we don't get piece data from a peer in this long */
80    SNUBBED_SEC = 60,
81
82    /* when many peers are available, keep idle ones this long */
83    MIN_UPLOAD_IDLE_SECS = (60 * 3),
84
85    /* when few peers are available, keep idle ones this long */
86    MAX_UPLOAD_IDLE_SECS = (60 * 10),
87
88    /* how frequently to decide which peers live and die */
89    RECONNECT_PERIOD_MSEC = (2 * 1000),
90
91    /* max # of peers to ask fer per torrent per reconnect pulse */
92    MAX_RECONNECTIONS_PER_PULSE = 1,
93
94    /* max number of peers to ask for per second overall.
95     * this throttle is to avoid overloading the router */
96    MAX_CONNECTIONS_PER_SECOND = 8,
97
98    /* number of unchoked peers per torrent */
99    MAX_UNCHOKED_PEERS = 12,
100
101    /* number of bad pieces a peer is allowed to send before we ban them */
102    MAX_BAD_PIECES_PER_PEER = 3,
103
104    /* use for bitwise operations w/peer_atom.myflags */
105    MYFLAG_BANNED = 1,
106
107    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
108    MYFLAG_UNREACHABLE = 2
109};
110
111
112/**
113***
114**/
115
116/* We keep one of these for every peer we know about, whether
117 * it's connected or not, so the struct must be small.
118 * When our current connections underperform, we dip back
119 * into this list for new ones. */
120struct peer_atom
121{   
122    uint8_t from;
123    uint8_t flags; /* these match the added_f flags */
124    uint8_t myflags; /* flags that aren't defined in added_f */
125    uint16_t port;
126    uint16_t numFails;
127    struct in_addr addr;
128    time_t time;
129    time_t piece_data_time;
130};
131
132typedef struct
133{
134    uint8_t hash[SHA_DIGEST_LENGTH];
135    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
136    tr_ptrArray * pool; /* struct peer_atom */
137    tr_ptrArray * peers; /* tr_peer */
138    tr_timer * reconnectTimer;
139    tr_timer * rechokeTimer;
140    tr_timer * refillTimer;
141    tr_timer * swiftTimer;
142    tr_torrent * tor;
143    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
144    tr_bitfield * requested;
145
146    unsigned int isRunning : 1;
147
148    struct tr_peerMgr * manager;
149}
150Torrent;
151
152struct tr_peerMgr
153{
154    tr_handle * handle;
155    tr_ptrArray * torrents; /* Torrent */
156    tr_ptrArray * incomingHandshakes; /* tr_handshake */
157};
158
159/**
160***
161**/
162
163static void
164myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
165{
166    FILE * fp = tr_getLog( );
167    if( fp != NULL )
168    {
169        va_list args;
170        char timestr[64];
171        struct evbuffer * buf = evbuffer_new( );
172        char * myfile = tr_strdup( file );
173
174        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
175        if( t != NULL )
176            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
177        va_start( args, fmt );
178        evbuffer_add_vprintf( buf, fmt, args );
179        va_end( args );
180        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
181        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
182
183        tr_free( myfile );
184        evbuffer_free( buf );
185    }
186}
187
188#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
189
190/**
191***
192**/
193
194static void
195managerLock( const struct tr_peerMgr * manager )
196{
197    tr_globalLock( manager->handle );
198}
199static void
200managerUnlock( const struct tr_peerMgr * manager )
201{
202    tr_globalUnlock( manager->handle );
203}
204static void
205torrentLock( Torrent * torrent )
206{
207    managerLock( torrent->manager );
208}
209static void
210torrentUnlock( Torrent * torrent )
211{
212    managerUnlock( torrent->manager );
213}
214static int
215torrentIsLocked( const Torrent * t )
216{
217    return tr_globalIsLocked( t->manager->handle );
218}
219
220/**
221***
222**/
223
224static int
225compareAddresses( const struct in_addr * a, const struct in_addr * b )
226{
227    return tr_compareUint32( a->s_addr, b->s_addr );
228}
229
230static int
231handshakeCompareToAddr( const void * va, const void * vb )
232{
233    const tr_handshake * a = va;
234    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
235}
236
237static int
238handshakeCompare( const void * a, const void * b )
239{
240    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
241}
242
243static tr_handshake*
244getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
245{
246    return tr_ptrArrayFindSorted( handshakes,
247                                  in_addr,
248                                  handshakeCompareToAddr );
249}
250
251static int
252comparePeerAtomToAddress( const void * va, const void * vb )
253{
254    const struct peer_atom * a = va;
255    return compareAddresses( &a->addr, vb );
256}
257
258static int
259comparePeerAtoms( const void * va, const void * vb )
260{
261    const struct peer_atom * b = vb;
262    return comparePeerAtomToAddress( va, &b->addr );
263}
264
265/**
266***
267**/
268
269static int
270torrentCompare( const void * va, const void * vb )
271{
272    const Torrent * a = va;
273    const Torrent * b = vb;
274    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
275}
276
277static int
278torrentCompareToHash( const void * va, const void * vb )
279{
280    const Torrent * a = va;
281    const uint8_t * b_hash = vb;
282    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
283}
284
285static Torrent*
286getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
287{
288    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
289                                             hash,
290                                             torrentCompareToHash );
291}
292
293static int
294peerCompare( const void * va, const void * vb )
295{
296    const tr_peer * a = va;
297    const tr_peer * b = vb;
298    return compareAddresses( &a->in_addr, &b->in_addr );
299}
300
301static int
302peerCompareToAddr( const void * va, const void * vb )
303{
304    const tr_peer * a = va;
305    return compareAddresses( &a->in_addr, vb );
306}
307
308static tr_peer*
309getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
310{
311    assert( torrentIsLocked( torrent ) );
312    assert( in_addr != NULL );
313
314    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
315                                             in_addr,
316                                             peerCompareToAddr );
317}
318
319static struct peer_atom*
320getExistingAtom( const Torrent * t, const struct in_addr * addr )
321{
322    assert( torrentIsLocked( t ) );
323    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
324}
325
326static int
327peerIsInUse( const Torrent * ct, const struct in_addr * addr )
328{
329    Torrent * t = (Torrent*) ct;
330
331    assert( torrentIsLocked ( t ) );
332
333    return getExistingPeer( t, addr )
334        || getExistingHandshake( t->outgoingHandshakes, addr )
335        || getExistingHandshake( t->manager->incomingHandshakes, addr );
336}
337
338static tr_peer*
339peerConstructor( const struct in_addr * in_addr )
340{
341    tr_peer * p;
342    p = tr_new0( tr_peer, 1 );
343    p->credit = SWIFT_INITIAL_CREDIT;
344    p->rcToClient = tr_rcInit( );
345    p->rcToPeer = tr_rcInit( );
346    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
347    return p;
348}
349
350static tr_peer*
351getPeer( Torrent * torrent, const struct in_addr * in_addr )
352{
353    tr_peer * peer;
354
355    assert( torrentIsLocked( torrent ) );
356
357    peer = getExistingPeer( torrent, in_addr );
358
359    if( peer == NULL ) {
360        peer = peerConstructor( in_addr );
361        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
362    }
363
364    return peer;
365}
366
367static void
368peerDestructor( tr_peer * peer )
369{
370    assert( peer != NULL );
371    assert( peer->msgs != NULL );
372
373    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
374    tr_peerMsgsFree( peer->msgs );
375
376    tr_peerIoFree( peer->io );
377
378    tr_bitfieldFree( peer->have );
379    tr_bitfieldFree( peer->blame );
380    tr_rcClose( peer->rcToClient );
381    tr_rcClose( peer->rcToPeer );
382    tr_free( peer->client );
383    tr_free( peer );
384}
385
386static void
387removePeer( Torrent * t, tr_peer * peer )
388{
389    tr_peer * removed;
390    struct peer_atom * atom;
391
392    assert( torrentIsLocked( t ) );
393
394    atom = getExistingAtom( t, &peer->in_addr );
395    assert( atom != NULL );
396    atom->time = time( NULL );
397
398    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
399    assert( removed == peer );
400    peerDestructor( removed );
401}
402
403static void
404removeAllPeers( Torrent * t )
405{
406    while( !tr_ptrArrayEmpty( t->peers ) )
407        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
408}
409
410static void
411torrentDestructor( Torrent * t )
412{
413    uint8_t hash[SHA_DIGEST_LENGTH];
414
415    assert( t != NULL );
416    assert( !t->isRunning );
417    assert( t->peers != NULL );
418    assert( torrentIsLocked( t ) );
419    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
420    assert( tr_ptrArrayEmpty( t->peers ) );
421
422    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
423
424    tr_timerFree( &t->reconnectTimer );
425    tr_timerFree( &t->rechokeTimer );
426    tr_timerFree( &t->refillTimer );
427    tr_timerFree( &t->swiftTimer );
428
429    tr_bitfieldFree( t->requested );
430    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
431    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
432    tr_ptrArrayFree( t->peers, NULL );
433
434    tr_free( t );
435}
436
437static Torrent*
438torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
439{
440    Torrent * t;
441
442    t = tr_new0( Torrent, 1 );
443    t->manager = manager;
444    t->tor = tor;
445    t->pool = tr_ptrArrayNew( );
446    t->peers = tr_ptrArrayNew( );
447    t->outgoingHandshakes = tr_ptrArrayNew( );
448    t->requested = tr_bitfieldNew( tor->blockCount );
449    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
450
451    return t;
452}
453
454/**
455 * For explanation, see http://www.bittorrent.org/fast_extensions.html
456 * Also see the "test-allowed-set" unit test
457 */
458struct tr_bitfield *
459tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
460                              const uint32_t         sz,        /* number of pieces in torrent */
461                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
462                              const struct in_addr * ip )       /* peer's address */
463{
464    uint8_t w[SHA_DIGEST_LENGTH + 4];
465    uint8_t x[SHA_DIGEST_LENGTH];
466    tr_bitfield_t * a;
467    uint32_t a_size;
468
469    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
470    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
471    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
472
473    a = tr_bitfieldNew( sz );
474    a_size = 0;
475   
476    while( a_size < k )
477    {
478        int i;
479        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
480        {
481            uint32_t j = i * 4;                                /* (5) */
482            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
483            uint32_t index = y % sz;                           /* (7) */
484            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
485                tr_bitfieldAdd( a, index );                    /* (9) */
486                ++a_size;
487            }
488        }
489        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
490    }
491   
492    return a;
493}
494
495tr_peerMgr*
496tr_peerMgrNew( tr_handle * handle )
497{
498    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
499    m->handle = handle;
500    m->torrents = tr_ptrArrayNew( );
501    m->incomingHandshakes = tr_ptrArrayNew( );
502    return m;
503}
504
505void
506tr_peerMgrFree( tr_peerMgr * manager )
507{
508    managerLock( manager );
509
510    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
511     * the item from manager->handshakes, so this is a little roundabout... */
512    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
513        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
514    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
515
516    /* free the torrents. */
517    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
518
519    managerUnlock( manager );
520    tr_free( manager );
521}
522
523static tr_peer**
524getConnectedPeers( Torrent * t, int * setmeCount )
525{
526    int i, peerCount, connectionCount;
527    tr_peer **peers;
528    tr_peer **ret;
529
530    assert( torrentIsLocked( t ) );
531
532    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
533    ret = tr_new( tr_peer*, peerCount );
534
535    for( i=connectionCount=0; i<peerCount; ++i )
536        if( peers[i]->msgs != NULL )
537            ret[connectionCount++] = peers[i];
538
539    *setmeCount = connectionCount;
540    return ret;
541}
542
543static int
544clientIsDownloadingFrom( const tr_peer * peer )
545{
546    return peer->clientIsInterested && !peer->clientIsChoked;
547}
548
549static int
550clientIsUploadingTo( const tr_peer * peer )
551{
552    return peer->peerIsInterested && !peer->peerIsChoked;
553}
554
555/***
556****
557***/
558
559int
560tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
561                      const uint8_t          * torrentHash,
562                      const struct in_addr   * addr )
563{
564    int isSeed = FALSE;
565    const Torrent * t = NULL;
566    const struct peer_atom * atom = NULL;
567
568    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
569    if( t )
570        atom = getExistingAtom( t, addr );
571    if( atom )
572        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
573
574    return isSeed;
575}
576
577/***
578****  Refill
579***/
580
581struct tr_refill_piece
582{
583    tr_priority_t priority;
584    int missingBlockCount;
585    uint16_t random;
586    uint32_t piece;
587    uint32_t peerCount;
588};
589
590static int
591compareRefillPiece (const void * aIn, const void * bIn)
592{
593    const struct tr_refill_piece * a = aIn;
594    const struct tr_refill_piece * b = bIn;
595
596    /* fewer missing pieces goes first */
597    if( a->missingBlockCount != b->missingBlockCount )
598        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
599   
600    /* if one piece has a higher priority, it goes first */
601    if( a->priority != b->priority )
602        return a->priority > b->priority ? -1 : 1;
603   
604    /* otherwise if one has fewer peers, it goes first */
605    if (a->peerCount != b->peerCount)
606        return a->peerCount < b->peerCount ? -1 : 1;
607
608    /* otherwise go with our random seed */
609    return tr_compareUint16( a->random, b->random );
610}
611
612static int
613isPieceInteresting( const tr_torrent  * tor,
614                    tr_piece_index_t    piece )
615{
616    if( tor->info.pieces[piece].dnd ) /* we don't want it */
617        return 0;
618
619    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
620        return 0;
621
622    return 1;
623}
624
625static uint32_t*
626getPreferredPieces( Torrent     * t,
627                    uint32_t    * pieceCount )
628{
629    const tr_torrent * tor = t->tor;
630    const tr_info * inf = &tor->info;
631    tr_piece_index_t i;
632    uint32_t poolSize = 0;
633    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
634    int peerCount;
635    tr_peer** peers;
636
637    assert( torrentIsLocked( t ) );
638
639    peers = getConnectedPeers( t, &peerCount );
640
641    for( i=0; i<inf->pieceCount; ++i )
642        if( isPieceInteresting( tor, i ) )
643            pool[poolSize++] = i;
644
645    /* sort the pool from most interesting to least... */
646    if( poolSize > 1 )
647    {
648        uint32_t j;
649        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
650
651        for( j=0; j<poolSize; ++j )
652        {
653            int k;
654            const int piece = pool[j];
655            struct tr_refill_piece * setme = p + j;
656
657            setme->piece = piece;
658            setme->priority = inf->pieces[piece].priority;
659            setme->peerCount = 0;
660            setme->random = tr_rand( UINT16_MAX );
661            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
662
663            for( k=0; k<peerCount; ++k ) {
664                const tr_peer * peer = peers[k];
665                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
666                    ++setme->peerCount;
667            }
668        }
669
670        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
671
672        for( j=0; j<poolSize; ++j )
673            pool[j] = p[j].piece;
674
675        tr_free( p );
676    }
677
678    tr_free( peers );
679
680    *pieceCount = poolSize;
681    return pool;
682}
683
684static uint64_t*
685getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
686{
687    int s;
688    uint32_t i;
689    uint32_t pieceCount;
690    uint32_t blockCount;
691    uint32_t unreqCount[3], reqCount[3];
692    uint32_t * pieces;
693    uint64_t * ret, * walk;
694    uint64_t * unreq[3], *req[3];
695    const tr_torrent * tor = t->tor;
696
697    assert( torrentIsLocked( t ) );
698
699    pieces = getPreferredPieces( t, &pieceCount );
700
701    /**
702     * Now we walk through those preferred pieces to find all the blocks
703     * are still missing from them.  We put unrequested blocks first,
704     * of course, but by including requested blocks afterwards, endgame
705     * handling happens naturally.
706     *
707     * By doing this once per priority we also effectively get an endgame
708     * mode for each priority level.  The helps keep high priority files
709     * from getting stuck at 99% due of unresponsive peers.
710     */
711
712    /* make temporary bins for the four tiers of blocks */
713    for( i=0; i<3; ++i ) {
714        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
715        reqCount[i] = 0;
716        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
717        unreqCount[i] = 0;
718    }
719
720    /* sort the blocks into our temp bins */
721    for( i=blockCount=0; i<pieceCount; ++i )
722    {
723        const tr_piece_index_t index = pieces[i];
724        const int priorityIndex = tor->info.pieces[index].priority + 1;
725        const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
726        const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
727        tr_block_index_t block;
728
729        for( block=begin; block<end; ++block )
730        {
731            if( tr_cpBlockIsComplete( tor->completion, block ) )
732                continue;
733
734            ++blockCount;
735
736            if( tr_bitfieldHas( t->requested, block ) )
737            {
738                const uint32_t n = reqCount[priorityIndex]++;
739                req[priorityIndex][n] = block;
740            }
741            else
742            {
743                const uint32_t n = unreqCount[priorityIndex]++;
744                unreq[priorityIndex][n] = block;
745            }
746        }
747    }
748
749    /* join the bins together, going from highest priority to lowest so
750     * the the blocks we want to request first will be first in the list */
751    ret = walk = tr_new( uint64_t, blockCount );
752    for( s=2; s>=0; --s ) {
753        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
754        walk += unreqCount[s];
755        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
756        walk += reqCount[s];
757    }
758    assert( ( walk - ret ) == ( int )blockCount );
759    *setmeCount = blockCount;
760
761    /* cleanup */
762    tr_free( pieces );
763    for( i=0; i<3; ++i ) {
764        tr_free( unreq[i] );
765        tr_free( req[i] );
766    }
767    return ret;
768}
769
770static tr_peer**
771getPeersUploadingToClient( Torrent * t, int * setmeCount )
772{
773    int i;
774    int peerCount = 0;
775    int retCount = 0;
776    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
777    tr_peer ** ret = tr_new( tr_peer*, peerCount );
778
779    /* get a list of peers we're downloading from */
780    for( i=0; i<peerCount; ++i )
781        if( clientIsDownloadingFrom( peers[i] ) )
782            ret[retCount++] = peers[i];
783
784    /* pick a different starting point each time so all peers
785     * get a chance at the first blocks in the queue */
786    if( retCount ) {
787        tr_peer ** tmp = tr_new( tr_peer*, retCount );
788        i = tr_rand( retCount );
789        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
790        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
791        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
792        tr_free( tmp );
793    }
794
795    *setmeCount = retCount;
796    return ret;
797}
798
799static int
800refillPulse( void * vtorrent )
801{
802    Torrent * t = vtorrent;
803    tr_torrent * tor = t->tor;
804    tr_block_index_t i;
805    int peerCount;
806    tr_peer ** peers;
807    tr_block_index_t blockCount;
808    uint64_t * blocks;
809
810    if( !t->isRunning )
811        return TRUE;
812    if( tr_torrentIsSeed( t->tor ) )
813        return TRUE;
814
815    torrentLock( t );
816    tordbg( t, "Refilling Request Buffers..." );
817
818    blocks = getPreferredBlocks( t, &blockCount );
819    peers = getPeersUploadingToClient( t, &peerCount );
820
821    for( i=0; peerCount && i<blockCount; ++i )
822    {
823        int j;
824
825        const tr_block_index_t block = blocks[i];
826        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
827        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
828        const uint32_t length = tr_torBlockCountBytes( tor, block );
829
830        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
831        assert( _tr_block( tor, index, begin ) == block );
832        assert( begin < tr_torPieceCountBytes( tor, index ) );
833        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
834
835        /* find a peer who can ask for this block */
836        for( j=0; j<peerCount; )
837        {
838            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
839            switch( val )
840            {
841                case TR_ADDREQ_FULL: 
842                case TR_ADDREQ_CLIENT_CHOKED:
843                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
844                    break;
845
846                case TR_ADDREQ_MISSING: 
847                case TR_ADDREQ_DUPLICATE: 
848                    ++j;
849                    break;
850
851                case TR_ADDREQ_OK:
852                    tr_bitfieldAdd( t->requested, block );
853                    j = peerCount;
854                    break;
855
856                default:
857                    assert( 0 && "unhandled value" );
858                    break;
859            }
860        }
861    }
862
863    /* cleanup */
864    tr_free( peers );
865    tr_free( blocks );
866
867    t->refillTimer = NULL;
868    torrentUnlock( t );
869    return FALSE;
870}
871
872static void
873broadcastClientHave( Torrent * t, uint32_t index )
874{
875    int i, size;
876    tr_peer ** peers;
877
878    assert( torrentIsLocked( t ) );
879
880    peers = getConnectedPeers( t, &size );
881    for( i=0; i<size; ++i )
882        tr_peerMsgsHave( peers[i]->msgs, index );
883    tr_free( peers );
884}
885
886static void
887broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
888{
889    int i, size;
890    tr_peer ** peers;
891
892    assert( torrentIsLocked( t ) );
893
894    peers = getConnectedPeers( t, &size );
895    for( i=0; i<size; ++i )
896        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
897    tr_free( peers );
898}
899
900static void
901addStrike( Torrent * t, tr_peer * peer )
902{
903    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
904
905    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
906    {
907        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
908        atom->myflags |= MYFLAG_BANNED;
909        peer->doPurge = 1;
910        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
911    }
912}
913
914static void
915msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
916{
917    tr_peer * peer = vpeer;
918    Torrent * t = (Torrent *) vt;
919    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
920
921    torrentLock( t );
922
923    switch( e->eventType )
924    {
925        case TR_PEERMSG_NEED_REQ:
926            if( t->refillTimer == NULL )
927                t->refillTimer = tr_timerNew( t->manager->handle,
928                                              refillPulse, t,
929                                              REFILL_PERIOD_MSEC );
930            break;
931
932        case TR_PEERMSG_CANCEL:
933            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
934            break;
935
936        case TR_PEERMSG_PIECE_DATA: {
937            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
938            atom->piece_data_time = time( NULL );
939            break;
940        }
941
942        case TR_PEERMSG_CLIENT_HAVE:
943            broadcastClientHave( t, e->pieceIndex );
944            tr_torrentRecheckCompleteness( t->tor );
945            break;
946
947        case TR_PEERMSG_PEER_PROGRESS: {
948            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
949            const int peerIsSeed = e->progress >= 1.0;
950            if( peerIsSeed ) {
951                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
952                atom->flags |= ADDED_F_SEED_FLAG;
953            } else {
954                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
955                atom->flags &= ~ADDED_F_SEED_FLAG;
956            } break;
957        }
958
959        case TR_PEERMSG_CLIENT_BLOCK:
960            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
961            break;
962
963        case TR_PEERMSG_ERROR:
964            if( TR_ERROR_IS_IO( e->err ) ) {
965                t->tor->error = e->err;
966                strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
967                tr_torrentStop( t->tor );
968            } else if( e->err == TR_ERROR_ASSERT ) {
969                addStrike( t, peer );
970            }
971            peer->doPurge = 1;
972            break;
973
974        default:
975            assert(0);
976    }
977
978    torrentUnlock( t );
979}
980
981static void
982ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
983{
984    if( getExistingAtom( t, addr ) == NULL )
985    {
986        struct peer_atom * a;
987        a = tr_new0( struct peer_atom, 1 );
988        a->addr = *addr;
989        a->port = port;
990        a->flags = flags;
991        a->from = from;
992        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
993        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
994    }
995}
996
997static int
998getMaxPeerCount( const tr_torrent * tor UNUSED )
999{
1000    return tor->maxConnectedPeers;
1001}
1002
1003/* FIXME: this is kind of a mess. */
1004static void
1005myHandshakeDoneCB( tr_handshake    * handshake,
1006                   tr_peerIo       * io,
1007                   int               isConnected,
1008                   const uint8_t   * peer_id,
1009                   void            * vmanager )
1010{
1011    int ok = isConnected;
1012    uint16_t port;
1013    const struct in_addr * addr;
1014    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
1015    Torrent * t;
1016    tr_handshake * ours;
1017
1018    assert( io != NULL );
1019    assert( isConnected==0 || isConnected==1 );
1020
1021    t = tr_peerIoHasTorrentHash( io )
1022        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1023        : NULL;
1024
1025    if( tr_peerIoIsIncoming ( io ) )
1026        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1027                                        handshake, handshakeCompare );
1028    else if( t != NULL )
1029        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1030                                        handshake, handshakeCompare );
1031    else
1032        ours = handshake;
1033
1034    assert( ours != NULL );
1035    assert( ours == handshake );
1036
1037    if( t != NULL )
1038        torrentLock( t );
1039
1040    addr = tr_peerIoGetAddress( io, &port );
1041
1042    if( !ok || !t || !t->isRunning )
1043    {
1044        if( t ) {
1045            struct peer_atom * atom = getExistingAtom( t, addr );
1046            if( atom )
1047                ++atom->numFails;
1048        }
1049
1050        tr_peerIoFree( io );
1051    }
1052    else /* looking good */
1053    {
1054        struct peer_atom * atom;
1055        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1056        atom = getExistingAtom( t, addr );
1057
1058        if( atom->myflags & MYFLAG_BANNED )
1059        {
1060            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1061            tr_peerIoFree( io );
1062        }
1063        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1064        {
1065            tr_peerIoFree( io );
1066        }
1067        else
1068        {
1069            tr_peer * peer = getExistingPeer( t, addr );
1070
1071            if( peer != NULL ) /* we already have this peer */
1072            {
1073                tr_peerIoFree( io );
1074            }
1075            else
1076            {
1077                peer = getPeer( t, addr );
1078                tr_free( peer->client );
1079                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
1080                peer->port = port;
1081                peer->io = io;
1082                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1083                atom->time = time( NULL );
1084            }
1085        }
1086    }
1087
1088    if( t != NULL )
1089        torrentUnlock( t );
1090}
1091
1092void
1093tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1094                       struct in_addr  * addr,
1095                       uint16_t          port,
1096                       int               socket )
1097{
1098    managerLock( manager );
1099
1100    if( tr_blocklistHasAddress( manager->handle, addr ) )
1101    {
1102        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1103                inet_ntoa( *addr ) );
1104        tr_netClose( socket );
1105    }
1106    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1107    {
1108        tr_netClose( socket );
1109    }
1110    else /* we don't have a connetion to them yet... */
1111    {
1112        tr_peerIo * io;
1113        tr_handshake * handshake;
1114
1115        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1116
1117        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1118
1119        handshake = tr_handshakeNew( io,
1120                                     manager->handle->encryptionMode,
1121                                     myHandshakeDoneCB,
1122                                     manager );
1123
1124        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1125    }
1126
1127    managerUnlock( manager );
1128}
1129
1130void
1131tr_peerMgrAddPex( tr_peerMgr     * manager,
1132                  const uint8_t  * torrentHash,
1133                  uint8_t          from,
1134                  const tr_pex   * pex )
1135{
1136    Torrent * t;
1137    managerLock( manager );
1138
1139    t = getExistingTorrent( manager, torrentHash );
1140    if( !tr_blocklistHasAddress( t->manager->handle, &pex->in_addr ) )
1141        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1142
1143    managerUnlock( manager );
1144}
1145
1146tr_pex *
1147tr_peerMgrCompactToPex( const void  * compact,
1148                        size_t        compactLen,
1149                        const char  * added_f,
1150                        size_t      * pexCount )
1151{
1152    size_t i;
1153    size_t n = compactLen / 6;
1154    const uint8_t * walk = compact;
1155    tr_pex * pex = tr_new0( tr_pex, n );
1156    for( i=0; i<n; ++i ) {
1157        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1158        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1159        if( added_f )
1160            pex[i].flags = added_f[i];
1161    }
1162    *pexCount = n;
1163    return pex;
1164}
1165
1166/**
1167***
1168**/
1169
1170void
1171tr_peerMgrSetBlame( tr_peerMgr     * manager,
1172                    const uint8_t  * torrentHash,
1173                    int              pieceIndex,
1174                    int              success )
1175{
1176    if( !success )
1177    {
1178        int peerCount, i;
1179        Torrent * t = getExistingTorrent( manager, torrentHash );
1180        tr_peer ** peers;
1181
1182        assert( torrentIsLocked( t ) );
1183
1184        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1185        for( i=0; i<peerCount; ++i )
1186        {
1187            tr_peer * peer = peers[i];
1188            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1189            {
1190                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1191                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1192                           pieceIndex, (int)peer->strikes+1 );
1193                addStrike( t, peer );
1194            }
1195        }
1196    }
1197}
1198
1199int
1200tr_pexCompare( const void * va, const void * vb )
1201{
1202    const tr_pex * a = (const tr_pex *) va;
1203    const tr_pex * b = (const tr_pex *) vb;
1204    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1205    if( i ) return i;
1206    if( a->port < b->port ) return -1;
1207    if( a->port > b->port ) return 1;
1208    return 0;
1209}
1210
1211int tr_pexCompare( const void * a, const void * b );
1212
1213static int
1214peerPrefersCrypto( const tr_peer * peer )
1215{
1216    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1217        return TRUE;
1218
1219    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1220        return FALSE;
1221
1222    return tr_peerIoIsEncrypted( peer->io );
1223};
1224
1225int
1226tr_peerMgrGetPeers( tr_peerMgr      * manager,
1227                    const uint8_t   * torrentHash,
1228                    tr_pex         ** setme_pex )
1229{
1230    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1231    int i, peerCount;
1232    const tr_peer ** peers;
1233    tr_pex * pex;
1234    tr_pex * walk;
1235
1236    managerLock( manager );
1237
1238    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1239    pex = walk = tr_new( tr_pex, peerCount );
1240
1241    for( i=0; i<peerCount; ++i, ++walk )
1242    {
1243        const tr_peer * peer = peers[i];
1244
1245        walk->in_addr = peer->in_addr;
1246
1247        walk->port = peer->port;
1248
1249        walk->flags = 0;
1250        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1251        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1252    }
1253
1254    assert( ( walk - pex ) == peerCount );
1255    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1256    *setme_pex = pex;
1257
1258    managerUnlock( manager );
1259
1260    return peerCount;
1261}
1262
1263static int reconnectPulse( void * vtorrent );
1264static int rechokePulse( void * vtorrent );
1265static int swiftPulse( void * vtorrent );
1266
1267void
1268tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1269                        const uint8_t  * torrentHash )
1270{
1271    Torrent * t;
1272
1273    managerLock( manager );
1274
1275    t = getExistingTorrent( manager, torrentHash );
1276
1277    assert( t != NULL );
1278    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1279    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1280    assert( ( t->isRunning != 0 ) == ( t->swiftTimer != NULL ) );
1281
1282    if( !t->isRunning )
1283    {
1284        t->isRunning = 1;
1285
1286        t->reconnectTimer = tr_timerNew( t->manager->handle,
1287                                         reconnectPulse, t,
1288                                         RECONNECT_PERIOD_MSEC );
1289
1290        t->rechokeTimer = tr_timerNew( t->manager->handle,
1291                                       rechokePulse, t,
1292                                       RECHOKE_PERIOD_MSEC );
1293
1294        t->swiftTimer = tr_timerNew( t->manager->handle,
1295                                     swiftPulse, t,
1296                                     SWIFT_PERIOD_MSEC );
1297
1298        reconnectPulse( t );
1299
1300        rechokePulse( t );
1301
1302        swiftPulse( t );
1303    }
1304
1305    managerUnlock( manager );
1306}
1307
1308static void
1309stopTorrent( Torrent * t )
1310{
1311    assert( torrentIsLocked( t ) );
1312
1313    t->isRunning = 0;
1314    tr_timerFree( &t->rechokeTimer );
1315    tr_timerFree( &t->reconnectTimer );
1316    tr_timerFree( &t->swiftTimer );
1317
1318    /* disconnect the peers. */
1319    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1320    tr_ptrArrayClear( t->peers );
1321
1322    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1323     * which removes the handshake from t->outgoingHandshakes... */
1324    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1325        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1326}
1327void
1328tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1329                       const uint8_t  * torrentHash)
1330{
1331    managerLock( manager );
1332
1333    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1334
1335    managerUnlock( manager );
1336}
1337
1338void
1339tr_peerMgrAddTorrent( tr_peerMgr * manager,
1340                      tr_torrent * tor )
1341{
1342    Torrent * t;
1343
1344    managerLock( manager );
1345
1346    assert( tor != NULL );
1347    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1348
1349    t = torrentConstructor( manager, tor );
1350    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1351
1352    managerUnlock( manager );
1353}
1354
1355void
1356tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1357                         const uint8_t  * torrentHash )
1358{
1359    Torrent * t;
1360
1361    managerLock( manager );
1362
1363    t = getExistingTorrent( manager, torrentHash );
1364    assert( t != NULL );
1365    stopTorrent( t );
1366    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1367    torrentDestructor( t );
1368
1369    managerUnlock( manager );
1370}
1371
1372void
1373tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1374                               const uint8_t    * torrentHash,
1375                               int8_t           * tab,
1376                               int                tabCount )
1377{
1378    int i;
1379    const Torrent * t;
1380    const tr_torrent * tor;
1381    float interval;
1382
1383    managerLock( (tr_peerMgr*)manager );
1384
1385    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1386    tor = t->tor;
1387    interval = tor->info.pieceCount / (float)tabCount;
1388
1389    memset( tab, 0, tabCount );
1390
1391    for( i=0; i<tabCount; ++i )
1392    {
1393        const int piece = i * interval;
1394
1395        if( tor == NULL )
1396            tab[i] = 0;
1397        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1398            tab[i] = -1;
1399        else {
1400            int j, peerCount;
1401            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1402            for( j=0; j<peerCount; ++j )
1403                if( tr_bitfieldHas( peers[j]->have, i ) )
1404                    ++tab[i];
1405        }
1406    }
1407
1408    managerUnlock( (tr_peerMgr*)manager );
1409}
1410
1411/* Returns the pieces that are available from peers */
1412tr_bitfield*
1413tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1414                        const uint8_t    * torrentHash )
1415{
1416    int i, size;
1417    Torrent * t;
1418    tr_peer ** peers;
1419    tr_bitfield * pieces;
1420    managerLock( (tr_peerMgr*)manager );
1421
1422    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1423    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1424    peers = getConnectedPeers( t, &size );
1425    for( i=0; i<size; ++i )
1426        tr_bitfieldOr( pieces, peers[i]->have );
1427
1428    managerUnlock( (tr_peerMgr*)manager );
1429    tr_free( peers );
1430    return pieces;
1431}
1432
1433int
1434tr_peerMgrHasConnections( const tr_peerMgr * manager,
1435                          const uint8_t    * torrentHash )
1436{
1437    int ret;
1438    const Torrent * t;
1439    managerLock( (tr_peerMgr*)manager );
1440
1441    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1442    ret = t && tr_ptrArraySize( t->peers );
1443
1444    managerUnlock( (tr_peerMgr*)manager );
1445    return ret;
1446}
1447
1448void
1449tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1450                        const uint8_t    * torrentHash,
1451                        int              * setmePeersKnown,
1452                        int              * setmePeersConnected,
1453                        int              * setmePeersSendingToUs,
1454                        int              * setmePeersGettingFromUs,
1455                        int              * setmePeersFrom )
1456{
1457    int i, size;
1458    const Torrent * t;
1459    const tr_peer ** peers;
1460
1461    managerLock( (tr_peerMgr*)manager );
1462
1463    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1464    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1465
1466    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1467    *setmePeersConnected      = 0;
1468    *setmePeersSendingToUs    = 0;
1469    *setmePeersGettingFromUs  = 0;
1470
1471    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1472        setmePeersFrom[i] = 0;
1473
1474    for( i=0; i<size; ++i )
1475    {
1476        const tr_peer * peer = peers[i];
1477        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1478
1479        if( peer->io == NULL ) /* not connected */
1480            continue;
1481
1482        ++*setmePeersConnected;
1483
1484        ++setmePeersFrom[atom->from];
1485
1486        if( clientIsDownloadingFrom( peer ) )
1487            ++*setmePeersSendingToUs;
1488
1489        if( clientIsUploadingTo( peer ) )
1490            ++*setmePeersGettingFromUs;
1491    }
1492
1493    managerUnlock( (tr_peerMgr*)manager );
1494}
1495
1496struct tr_peer_stat *
1497tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1498                     const uint8_t     * torrentHash,
1499                     int               * setmeCount UNUSED )
1500{
1501    int i, size;
1502    const Torrent * t;
1503    tr_peer ** peers;
1504    tr_peer_stat * ret;
1505
1506    assert( manager != NULL );
1507    managerLock( (tr_peerMgr*)manager );
1508
1509    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1510    peers = getConnectedPeers( (Torrent*)t, &size );
1511    ret = tr_new0( tr_peer_stat, size );
1512
1513    for( i=0; i<size; ++i )
1514    {
1515        char * pch;
1516        const tr_peer * peer = peers[i];
1517        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1518        tr_peer_stat * stat = ret + i;
1519
1520        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1521        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1522        stat->port               = peer->port;
1523        stat->from               = atom->from;
1524        stat->progress           = peer->progress;
1525        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1526        stat->uploadToRate       = peer->rateToPeer;
1527        stat->downloadFromRate   = peer->rateToClient;
1528        stat->peerIsChoked       = peer->peerIsChoked;
1529        stat->peerIsInterested   = peer->peerIsInterested;
1530        stat->clientIsChoked     = peer->clientIsChoked;
1531        stat->clientIsInterested = peer->clientIsInterested;
1532        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1533        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1534        stat->isUploadingTo      = clientIsUploadingTo( peer );
1535
1536        pch = stat->flagStr;
1537        if( t->optimistic == peer ) *pch++ = 'O';
1538        if( stat->isDownloadingFrom ) *pch++ = 'D';
1539        else if( stat->clientIsInterested ) *pch++ = 'd';
1540        if( stat->isUploadingTo ) *pch++ = 'U';
1541        else if( stat->peerIsInterested ) *pch++ = 'u';
1542        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1543        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1544        if( stat->isEncrypted ) *pch++ = 'E';
1545        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1546        if( stat->isIncoming ) *pch++ = 'I';
1547        *pch = '\0';
1548    }
1549
1550    *setmeCount = size;
1551    tr_free( peers );
1552
1553    managerUnlock( (tr_peerMgr*)manager );
1554    return ret;
1555}
1556
1557/**
1558***
1559**/
1560
1561struct ChokeData
1562{
1563    uint8_t doUnchoke;
1564    uint8_t isInterested;
1565    uint32_t rate;
1566    tr_peer * peer;
1567};
1568
1569static int
1570compareChoke( const void * va, const void * vb )
1571{
1572    const struct ChokeData * a = va;
1573    const struct ChokeData * b = vb;
1574    return -tr_compareUint32( a->rate, b->rate );
1575}
1576
1577static int
1578isNew( const tr_peer * peer )
1579{
1580    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1581}
1582
1583static int
1584isSame( const tr_peer * peer )
1585{
1586    return peer && peer->client && strstr( peer->client, "Transmission" );
1587}
1588
1589/**
1590***
1591**/
1592
1593static double
1594getWeightedRate( const tr_peer * peer, int clientIsSeed )
1595{
1596    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1597                                        : peer->rateToClient ) );
1598}
1599
1600static void
1601rechoke( Torrent * t )
1602{
1603    int i, n, peerCount, size, unchokedInterested;
1604    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1605    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1606    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1607
1608    assert( torrentIsLocked( t ) );
1609   
1610    /* sort the peers by preference and rate */
1611    for( i=0, size=0; i<peerCount; ++i )
1612    {
1613        tr_peer * peer = peers[i];
1614        if( peer->progress >= 1.0 ) /* choke all seeds */
1615            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1616        else {
1617            struct ChokeData * node = &choke[size++];
1618            node->peer = peer;
1619            node->isInterested = peer->peerIsInterested;
1620            node->rate = getWeightedRate( peer, clientIsSeed );
1621        }
1622    }
1623
1624    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1625
1626    /**
1627     * Reciprocation and number of uploads capping is managed by unchoking
1628     * the N peers which have the best upload rate and are interested.
1629     * This maximizes the client's download rate. These N peers are
1630     * referred to as downloaders, because they are interested in downloading
1631     * from the client.
1632     *
1633     * Peers which have a better upload rate (as compared to the downloaders)
1634     * but aren't interested get unchoked. If they become interested, the
1635     * downloader with the worst upload rate gets choked. If a client has
1636     * a complete file, it uses its upload rate rather than its download
1637     * rate to decide which peers to unchoke.
1638     */
1639    unchokedInterested = 0;
1640    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1641        choke[i].doUnchoke = 1;
1642        if( choke[i].isInterested )
1643            ++unchokedInterested;
1644    }
1645    n = i;
1646    while( i<size )
1647        choke[i++].doUnchoke = 0;
1648
1649    /* optimistic unchoke */
1650    if( i < size )
1651    {
1652        struct ChokeData * c;
1653        tr_ptrArray * randPool = tr_ptrArrayNew( );
1654        for( ; i<size; ++i )
1655        {
1656            const tr_peer * peer = choke[i].peer;
1657            int x=1, y;
1658            if( isNew( peer ) ) x *= 3;
1659            if( isSame( peer ) ) x *= 3;
1660            for( y=0; y<x; ++y )
1661                tr_ptrArrayAppend( randPool, choke );
1662        }
1663        i = tr_rand( tr_ptrArraySize( randPool ) );
1664        c = ( struct ChokeData* )tr_ptrArrayNth( randPool, i);
1665        c->doUnchoke = 1;
1666        t->optimistic = c->peer;
1667        tr_ptrArrayFree( randPool, NULL );
1668    }
1669
1670    for( i=0; i<size; ++i )
1671        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1672
1673    /* cleanup */
1674    tr_free( choke );
1675    tr_free( peers );
1676}
1677
1678static int
1679rechokePulse( void * vtorrent )
1680{
1681    Torrent * t = vtorrent;
1682    torrentLock( t );
1683    rechoke( t );
1684    torrentUnlock( t );
1685    return TRUE;
1686}
1687
1688/***
1689****
1690***/
1691
1692static int
1693swiftPulse( void * vtorrent )
1694{
1695    Torrent * t = vtorrent;
1696    torrentLock( t );
1697
1698    if( !tr_torrentIsSeed( t->tor ) )
1699    {
1700        int i;
1701        int peerCount = 0;
1702        int deadbeatCount = 0;
1703        tr_peer ** peers = getConnectedPeers( t, &peerCount );
1704        tr_peer ** deadbeats = tr_new( tr_peer*, peerCount );
1705
1706        const double ul_KiBsec = tr_rcRate( t->tor->upload );
1707        const double ul_KiB = ul_KiBsec * (SWIFT_PERIOD_MSEC/1000.0);
1708        const double ul_bytes = ul_KiB * 1024;
1709        const double freeCreditTotal = ul_bytes * SWIFT_LARGESSE;
1710        int freeCreditPerPeer;
1711
1712        for( i=0; i<peerCount; ++i ) {
1713            tr_peer * peer = peers[i];
1714            if( peer->credit <= 0 )
1715                deadbeats[deadbeatCount++] =  peer;
1716        }
1717
1718        freeCreditPerPeer = (int)( freeCreditTotal / deadbeatCount );
1719        for( i=0; i<deadbeatCount; ++i )
1720            deadbeats[i]->credit = freeCreditPerPeer;
1721
1722        tordbg( t, "%d deadbeats, "
1723            "who are each being granted %d bytes' credit "
1724            "for a total of %.1f KiB, "
1725            "%d%% of the torrent's ul speed %.1f\n",
1726            deadbeatCount, freeCreditPerPeer,
1727            ul_KiBsec*SWIFT_LARGESSE, (int)(SWIFT_LARGESSE*100), ul_KiBsec );
1728
1729        tr_free( deadbeats );
1730        tr_free( peers );
1731    }
1732
1733    torrentUnlock( t );
1734    return TRUE;
1735}
1736
1737/***
1738****
1739****  Life and Death
1740****
1741***/
1742
1743static int
1744shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1745{
1746    const tr_torrent * tor = t->tor;
1747    const time_t now = time( NULL );
1748    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1749
1750    /* if it's marked for purging, close it */
1751    if( peer->doPurge ) {
1752        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1753        return TRUE;
1754    }
1755
1756    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1757    if( 1 ) {
1758        const int clientIsSeed = tr_torrentIsSeed( tor );
1759        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1760        if( peerIsSeed && clientIsSeed && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1761            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1762            return TRUE;
1763        }
1764    }
1765
1766    /* disconnect if it's been too long since piece data has been transferred.
1767     * this is on a sliding scale based on number of available peers... */
1768    if( 1 ) {
1769        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1770        /* if we have >= relaxIfFewerThan, strictness is 100%.
1771         * if we have zero connections, strictness is 0% */
1772        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1773            ? 1.0
1774            : peerCount / (double)relaxStrictnessIfFewerThanN;
1775        const int lo = MIN_UPLOAD_IDLE_SECS;
1776        const int hi = MAX_UPLOAD_IDLE_SECS;
1777        const int limit = lo + ((hi-lo) * strictness);
1778        const time_t then = peer->pieceDataActivityDate;
1779        const int idleTime = then ? (now-then) : 0;
1780        if( idleTime > limit ) {
1781            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1782                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1783            return TRUE;
1784        }
1785    }
1786
1787    return FALSE;
1788}
1789
1790static tr_peer **
1791getPeersToClose( Torrent * t, int * setmeSize )
1792{
1793    int i, peerCount, outsize;
1794    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1795    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1796
1797    assert( torrentIsLocked( t ) );
1798
1799    for( i=outsize=0; i<peerCount; ++i )
1800        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1801            ret[outsize++] = peers[i];
1802
1803    *setmeSize = outsize;
1804    return ret;
1805}
1806
1807static int
1808compareCandidates( const void * va, const void * vb )
1809{
1810    const struct peer_atom * a = * (const struct peer_atom**) va;
1811    const struct peer_atom * b = * (const struct peer_atom**) vb;
1812    int i;
1813
1814    if( a->piece_data_time > b->piece_data_time ) return -1;
1815    if( a->piece_data_time < b->piece_data_time ) return  1;
1816
1817    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1818        return i;
1819
1820    if( a->time != b->time )
1821        return a->time < b->time ? -1 : 1;
1822
1823    return 0;
1824}
1825
1826static struct peer_atom **
1827getPeerCandidates( Torrent * t, int * setmeSize )
1828{
1829    int i, atomCount, retCount;
1830    struct peer_atom ** atoms;
1831    struct peer_atom ** ret;
1832    const time_t now = time( NULL );
1833    const int seed = tr_torrentIsSeed( t->tor );
1834
1835    assert( torrentIsLocked( t ) );
1836
1837    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1838    ret = tr_new( struct peer_atom*, atomCount );
1839    for( i=retCount=0; i<atomCount; ++i )
1840    {
1841        struct peer_atom * atom = atoms[i];
1842
1843        /* peer fed us too much bad data ... we only keep it around
1844         * now to weed it out in case someone sends it to us via pex */
1845        if( atom->myflags & MYFLAG_BANNED )
1846            continue;
1847
1848        /* peer was unconnectable before, so we're not going to keep trying.
1849         * this is needs a separate flag from `banned', since if they try
1850         * to connect to us later, we'll let them in */
1851        if( atom->myflags & MYFLAG_UNREACHABLE )
1852            continue;
1853
1854        /* we don't need two connections to the same peer... */
1855        if( peerIsInUse( t, &atom->addr ) )
1856            continue;
1857
1858        /* no need to connect if we're both seeds... */
1859        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1860            continue;
1861
1862        /* we're wasting our time trying to connect to this bozo. */
1863        if( atom->numFails > 3 )
1864            continue;
1865
1866        /* If we were connected to this peer recently and transferring
1867         * piece data, try to reconnect -- network troubles may have
1868         * disconnected us.  but if we weren't sharing piece data,
1869         * hold off on this peer to give another one a try instead */
1870        if( ( now - atom->piece_data_time ) > 30 )
1871        {
1872            int minWait = (60 * 10); /* ten minutes */
1873            int maxWait = (60 * 30); /* thirty minutes */
1874            int wait = atom->numFails * minWait;
1875            if( wait < minWait ) wait = minWait;
1876            if( wait > maxWait ) wait = maxWait;
1877            if( ( now - atom->time ) < wait ) {
1878                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1879                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1880                continue;
1881            }
1882        }
1883
1884        /* Don't connect to peers in our blocklist */
1885        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1886            continue;
1887
1888        ret[retCount++] = atom;
1889    }
1890
1891    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1892    *setmeSize = retCount;
1893    return ret;
1894}
1895
1896static int
1897reconnectPulse( void * vtorrent )
1898{
1899    Torrent * t = vtorrent;
1900    static time_t prevTime = 0;
1901    static int newConnectionsThisSecond = 0;
1902    time_t now;
1903
1904    torrentLock( t );
1905
1906    now = time( NULL );
1907    if( prevTime != now )
1908    {
1909        prevTime = now;
1910        newConnectionsThisSecond = 0;
1911    }
1912
1913    if( !t->isRunning )
1914    {
1915        removeAllPeers( t );
1916    }
1917    else
1918    {
1919        int i, nCandidates, nBad;
1920        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1921        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1922
1923        if( nBad || nCandidates )
1924            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1925                       "%d connection candidates, %d atoms, max per pulse is %d",
1926                       t->tor->info.name, nBad, nCandidates,
1927                       tr_ptrArraySize(t->pool),
1928                       (int)MAX_RECONNECTIONS_PER_PULSE );
1929
1930        /* disconnect some peers.
1931           if we got transferred piece data, then they might be good peers,
1932           so reset their `numFails' weight to zero.  otherwise we connected
1933           to them fruitlessly, so mark it as another fail */
1934        for( i=0; i<nBad; ++i ) {
1935            tr_peer * peer = connections[i];
1936            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1937            if( peer->pieceDataActivityDate )
1938                atom->numFails = 0;
1939            else
1940                ++atom->numFails;
1941            removePeer( t, peer );
1942        }
1943
1944        /* add some new ones */
1945        for( i=0;    i < nCandidates
1946                  && i < MAX_RECONNECTIONS_PER_PULSE
1947                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1948        {
1949            tr_peerMgr * mgr = t->manager;
1950            struct peer_atom * atom = candidates[i];
1951            tr_peerIo * io;
1952
1953            tordbg( t, "Starting an OUTGOING connection with %s",
1954                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1955
1956            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1957            if( io == NULL )
1958            {
1959                atom->myflags |= MYFLAG_UNREACHABLE;
1960            }
1961            else
1962            {
1963                tr_handshake * handshake = tr_handshakeNew( io,
1964                                                            mgr->handle->encryptionMode,
1965                                                            myHandshakeDoneCB,
1966                                                            mgr );
1967
1968                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1969
1970                ++newConnectionsThisSecond;
1971
1972                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1973            }
1974
1975            atom->time = time( NULL );
1976        }
1977
1978        /* cleanup */
1979        tr_free( connections );
1980        tr_free( candidates );
1981    }
1982
1983    torrentUnlock( t );
1984    return TRUE;
1985}
Note: See TracBrowser for help on using the repository browser.