source: branches/1.0x/libtransmission/peer-mgr.c @ 4855

Last change on this file since 4855 was 4855, checked in by charles, 15 years ago

(1.0x) #663: connection limits don't work correctly.

  • Property svn:keywords set to Date Rev Author Id
File size: 54.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 4855 2008-01-28 21:05:48Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "platform.h"
34#include "ptrarray.h"
35#include "ratecontrol.h"
36#include "shared.h"
37#include "torrent.h"
38#include "trcompat.h" /* strlcpy */
39#include "trevent.h"
40#include "utils.h"
41
42/**
43*** The "SWIFT" system is described by Karthik Tamilmani,
44*** Vinay Pai, and Alexander Mohr of Stony Brook University
45*** in their paper "SWIFT: A System With Incentives For Trading"
46*** http://citeseer.ist.psu.edu/tamilmani04swift.html
47***
48*** More SWIFT constants are defined in peer-mgr-private.h
49**/
50
51/**
52 * Allow new peers to download this many bytes from
53 * us when getting started.  This can prevent gridlock
54 * with other peers using tit-for-tat algorithms
55 */
56static const int SWIFT_INITIAL_CREDIT = 64 * 1024; /* 64 KiB */
57
58/**
59 * We expend a fraction of our torrent's total upload speed
60 * on largesse by uniformly distributing free credit to
61 * all of our peers.  This too helps prevent gridlock.
62 */
63static const double SWIFT_LARGESSE = 0.15; /* 15% of our UL */
64
65/**
66 * How frequently to extend largesse-based credit
67 */
68static const int SWIFT_PERIOD_MSEC = 5000;
69
70
71enum
72{
73    /* how frequently to change which peers are choked */
74    RECHOKE_PERIOD_MSEC = (10 * 1000),
75
76    /* how frequently to refill peers' request lists */
77    REFILL_PERIOD_MSEC = 666,
78
79    /* following the BT spec, we consider ourselves `snubbed' if
80     * we're we don't get piece data from a peer in this long */
81    SNUBBED_SEC = 60,
82
83    /* when many peers are available, keep idle ones this long */
84    MIN_UPLOAD_IDLE_SECS = (60 * 3),
85
86    /* when few peers are available, keep idle ones this long */
87    MAX_UPLOAD_IDLE_SECS = (60 * 10),
88
89    /* how frequently to decide which peers live and die */
90    RECONNECT_PERIOD_MSEC = (2 * 1000),
91
92    /* max # of peers to ask fer per torrent per reconnect pulse */
93    MAX_RECONNECTIONS_PER_PULSE = 1,
94
95    /* max number of peers to ask for per second overall.
96     * this throttle is to avoid overloading the router */
97    MAX_CONNECTIONS_PER_SECOND = 8,
98
99    /* number of unchoked peers per torrent */
100    MAX_UNCHOKED_PEERS = 12,
101
102    /* corresponds to ut_pex's added.f flags */
103    ADDED_F_ENCRYPTION_FLAG = 1,
104
105    /* corresponds to ut_pex's added.f flags */
106    ADDED_F_SEED_FLAG = 2,
107
108    /* number of bad pieces a peer is allowed to send before we ban them */
109    MAX_BAD_PIECES_PER_PEER = 3,
110
111    /* use for bitwise operations w/peer_atom.myflags */
112    MYFLAG_BANNED = 1,
113
114    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
115    MYFLAG_UNREACHABLE = 2
116};
117
118
119/**
120***
121**/
122
123/* We keep one of these for every peer we know about, whether
124 * it's connected or not, so the struct must be small.
125 * When our current connections underperform, we dip back
126 * into this list for new ones. */
127struct peer_atom
128{   
129    uint8_t from;
130    uint8_t flags; /* these match the added_f flags */
131    uint8_t myflags; /* flags that aren't defined in added_f */
132    uint16_t port;
133    uint16_t numFails;
134    struct in_addr addr;
135    time_t time;
136    time_t piece_data_time;
137};
138
139typedef struct
140{
141    uint8_t hash[SHA_DIGEST_LENGTH];
142    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
143    tr_ptrArray * pool; /* struct peer_atom */
144    tr_ptrArray * peers; /* tr_peer */
145    tr_timer * reconnectTimer;
146    tr_timer * rechokeTimer;
147    tr_timer * refillTimer;
148    tr_timer * swiftTimer;
149    tr_torrent * tor;
150    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
151    tr_bitfield * requested;
152
153    unsigned int isRunning : 1;
154
155    struct tr_peerMgr * manager;
156}
157Torrent;
158
159struct tr_peerMgr
160{
161    tr_handle * handle;
162    tr_ptrArray * torrents; /* Torrent */
163    tr_ptrArray * incomingHandshakes; /* tr_handshake */
164};
165
166/**
167***
168**/
169
170static void
171myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
172{
173    FILE * fp = tr_getLog( );
174    if( fp != NULL )
175    {
176        va_list args;
177        char timestr[64];
178        struct evbuffer * buf = evbuffer_new( );
179        char * myfile = tr_strdup( file );
180
181        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
182        if( t != NULL )
183            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
184        va_start( args, fmt );
185        evbuffer_add_vprintf( buf, fmt, args );
186        va_end( args );
187        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
188        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
189
190        tr_free( myfile );
191        evbuffer_free( buf );
192    }
193}
194
195#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
196
197/**
198***
199**/
200
201static void
202managerLock( struct tr_peerMgr * manager )
203{
204    tr_globalLock( manager->handle );
205}
206static void
207managerUnlock( struct tr_peerMgr * manager )
208{
209    tr_globalUnlock( manager->handle );
210}
211static void
212torrentLock( Torrent * torrent )
213{
214    managerLock( torrent->manager );
215}
216static void
217torrentUnlock( Torrent * torrent )
218{
219    managerUnlock( torrent->manager );
220}
221static int
222torrentIsLocked( const Torrent * t )
223{
224    return tr_globalIsLocked( t->manager->handle );
225}
226
227/**
228***
229**/
230
231static int
232compareAddresses( const struct in_addr * a, const struct in_addr * b )
233{
234    return tr_compareUint32( a->s_addr, b->s_addr );
235}
236
237static int
238handshakeCompareToAddr( const void * va, const void * vb )
239{
240    const tr_handshake * a = va;
241    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
242}
243
244static int
245handshakeCompare( const void * a, const void * b )
246{
247    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
248}
249
250static tr_handshake*
251getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
252{
253    return tr_ptrArrayFindSorted( handshakes,
254                                  in_addr,
255                                  handshakeCompareToAddr );
256}
257
258static int
259comparePeerAtomToAddress( const void * va, const void * vb )
260{
261    const struct peer_atom * a = va;
262    return compareAddresses( &a->addr, vb );
263}
264
265static int
266comparePeerAtoms( const void * va, const void * vb )
267{
268    const struct peer_atom * b = vb;
269    return comparePeerAtomToAddress( va, &b->addr );
270}
271
272/**
273***
274**/
275
276static int
277torrentCompare( const void * va, const void * vb )
278{
279    const Torrent * a = va;
280    const Torrent * b = vb;
281    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
282}
283
284static int
285torrentCompareToHash( const void * va, const void * vb )
286{
287    const Torrent * a = va;
288    const uint8_t * b_hash = vb;
289    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
290}
291
292static Torrent*
293getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
294{
295    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
296                                             hash,
297                                             torrentCompareToHash );
298}
299
300static int
301peerCompare( const void * va, const void * vb )
302{
303    const tr_peer * a = va;
304    const tr_peer * b = vb;
305    return compareAddresses( &a->in_addr, &b->in_addr );
306}
307
308static int
309peerCompareToAddr( const void * va, const void * vb )
310{
311    const tr_peer * a = va;
312    return compareAddresses( &a->in_addr, vb );
313}
314
315static tr_peer*
316getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
317{
318    assert( torrentIsLocked( torrent ) );
319    assert( in_addr != NULL );
320
321    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
322                                             in_addr,
323                                             peerCompareToAddr );
324}
325
326static struct peer_atom*
327getExistingAtom( const Torrent * t, const struct in_addr * addr )
328{
329    assert( torrentIsLocked( t ) );
330    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
331}
332
333static int
334peerIsInUse( const Torrent * ct, const struct in_addr * addr )
335{
336    Torrent * t = (Torrent*) ct;
337
338    assert( torrentIsLocked ( t ) );
339
340    return getExistingPeer( t, addr )
341        || getExistingHandshake( t->outgoingHandshakes, addr )
342        || getExistingHandshake( t->manager->incomingHandshakes, addr );
343}
344
345static tr_peer*
346peerConstructor( const struct in_addr * in_addr )
347{
348    tr_peer * p;
349    p = tr_new0( tr_peer, 1 );
350    p->credit = SWIFT_INITIAL_CREDIT;
351    p->rcToClient = tr_rcInit( );
352    p->rcToPeer = tr_rcInit( );
353    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
354    return p;
355}
356
357static tr_peer*
358getPeer( Torrent * torrent, const struct in_addr * in_addr )
359{
360    tr_peer * peer;
361
362    assert( torrentIsLocked( torrent ) );
363
364    peer = getExistingPeer( torrent, in_addr );
365
366    if( peer == NULL ) {
367        peer = peerConstructor( in_addr );
368        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
369    }
370
371    return peer;
372}
373
374static void
375peerDestructor( tr_peer * peer )
376{
377    assert( peer != NULL );
378    assert( peer->msgs != NULL );
379
380    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
381    tr_peerMsgsFree( peer->msgs );
382
383    tr_peerIoFree( peer->io );
384
385    tr_bitfieldFree( peer->have );
386    tr_bitfieldFree( peer->blame );
387    tr_rcClose( peer->rcToClient );
388    tr_rcClose( peer->rcToPeer );
389    tr_free( peer->client );
390    tr_free( peer );
391}
392
393static void
394removePeer( Torrent * t, tr_peer * peer )
395{
396    tr_peer * removed;
397    struct peer_atom * atom;
398
399    assert( torrentIsLocked( t ) );
400
401    atom = getExistingAtom( t, &peer->in_addr );
402    assert( atom != NULL );
403    atom->time = time( NULL );
404
405    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
406    assert( removed == peer );
407    peerDestructor( removed );
408}
409
410static void
411removeAllPeers( Torrent * t )
412{
413    while( !tr_ptrArrayEmpty( t->peers ) )
414        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
415}
416
417static void
418torrentDestructor( Torrent * t )
419{
420    uint8_t hash[SHA_DIGEST_LENGTH];
421
422    assert( t != NULL );
423    assert( !t->isRunning );
424    assert( t->peers != NULL );
425    assert( torrentIsLocked( t ) );
426    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
427    assert( tr_ptrArrayEmpty( t->peers ) );
428
429    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
430
431    tr_timerFree( &t->reconnectTimer );
432    tr_timerFree( &t->rechokeTimer );
433    tr_timerFree( &t->refillTimer );
434    tr_timerFree( &t->swiftTimer );
435
436    tr_bitfieldFree( t->requested );
437    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
438    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
439    tr_ptrArrayFree( t->peers, NULL );
440
441    tr_free( t );
442}
443
444static Torrent*
445torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
446{
447    Torrent * t;
448
449    t = tr_new0( Torrent, 1 );
450    t->manager = manager;
451    t->tor = tor;
452    t->pool = tr_ptrArrayNew( );
453    t->peers = tr_ptrArrayNew( );
454    t->outgoingHandshakes = tr_ptrArrayNew( );
455    t->requested = tr_bitfieldNew( tor->blockCount );
456    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
457
458    return t;
459}
460
461/**
462 * For explanation, see http://www.bittorrent.org/fast_extensions.html
463 * Also see the "test-allowed-set" unit test
464 */
465struct tr_bitfield *
466tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
467                              const uint32_t         sz,        /* number of pieces in torrent */
468                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
469                              const struct in_addr * ip )       /* peer's address */
470{
471    uint8_t w[SHA_DIGEST_LENGTH + 4];
472    uint8_t x[SHA_DIGEST_LENGTH];
473    tr_bitfield_t * a;
474    uint32_t a_size;
475
476    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
477    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
478    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
479
480    a = tr_bitfieldNew( sz );
481    a_size = 0;
482   
483    while( a_size < k )
484    {
485        int i;
486        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
487        {
488            uint32_t j = i * 4;                                /* (5) */
489            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
490            uint32_t index = y % sz;                           /* (7) */
491            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
492                tr_bitfieldAdd( a, index );                    /* (9) */
493                ++a_size;
494            }
495        }
496        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
497    }
498   
499    return a;
500}
501
502tr_peerMgr*
503tr_peerMgrNew( tr_handle * handle )
504{
505    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
506    m->handle = handle;
507    m->torrents = tr_ptrArrayNew( );
508    m->incomingHandshakes = tr_ptrArrayNew( );
509    return m;
510}
511
512void
513tr_peerMgrFree( tr_peerMgr * manager )
514{
515    managerLock( manager );
516
517    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
518     * the item from manager->handshakes, so this is a little roundabout... */
519    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
520        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
521    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
522
523    /* free the torrents. */
524    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
525
526    managerUnlock( manager );
527    tr_free( manager );
528}
529
530static tr_peer**
531getConnectedPeers( Torrent * t, int * setmeCount )
532{
533    int i, peerCount, connectionCount;
534    tr_peer **peers;
535    tr_peer **ret;
536
537    assert( torrentIsLocked( t ) );
538
539    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
540    ret = tr_new( tr_peer*, peerCount );
541
542    for( i=connectionCount=0; i<peerCount; ++i )
543        if( peers[i]->msgs != NULL )
544            ret[connectionCount++] = peers[i];
545
546    *setmeCount = connectionCount;
547    return ret;
548}
549
550/***
551****  Refill
552***/
553
554struct tr_refill_piece
555{
556    tr_priority_t priority;
557    int percentDone;
558    uint16_t random;
559    uint32_t piece;
560    uint32_t peerCount;
561    uint32_t fastAllowed;
562    uint32_t suggested;
563};
564
565static int
566compareRefillPiece (const void * aIn, const void * bIn)
567{
568    const struct tr_refill_piece * a = aIn;
569    const struct tr_refill_piece * b = bIn;
570   
571    /* if one piece has a higher priority, it goes first */
572    if( a->priority != b->priority )
573        return a->priority > b->priority ? -1 : 1;
574
575    /* try to fill partial pieces */
576    if( a->percentDone != b->percentDone )
577        return a->percentDone > b->percentDone ? -1 : 1;
578   
579    /* if one *might be* fastallowed to us, get it first...
580     * I'm putting it on top so we prioritize those pieces at
581     * startup, then we'll have them, and we'll be denied access
582     * to them */
583    if (a->fastAllowed != b->fastAllowed)
584        return a->fastAllowed < b->fastAllowed ? -1 : 1;
585   
586    /* otherwise if one was suggested to us, get it */
587    if (a->suggested != b->suggested)
588        return a->suggested < b->suggested ? -1 : 1;
589   
590    /* otherwise if one has fewer peers, it goes first */
591    if (a->peerCount != b->peerCount)
592        return a->peerCount < b->peerCount ? -1 : 1;
593
594    /* otherwise go with our random seed */
595    return tr_compareUint16( a->random, b->random );
596}
597
598static int
599isPieceInteresting( const tr_torrent  * tor,
600                    int                 piece )
601{
602    if( tor->info.pieces[piece].dnd ) /* we don't want it */
603        return 0;
604
605    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
606        return 0;
607
608    return 1;
609}
610
611static uint32_t*
612getPreferredPieces( Torrent     * t,
613                    uint32_t    * pieceCount )
614{
615    const tr_torrent * tor = t->tor;
616    const tr_info * inf = &tor->info;
617    int i;
618    uint32_t poolSize = 0;
619    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
620    int peerCount;
621    tr_peer** peers;
622
623    assert( torrentIsLocked( t ) );
624
625    peers = getConnectedPeers( t, &peerCount );
626
627    for( i=0; i<inf->pieceCount; ++i )
628        if( isPieceInteresting( tor, i ) )
629            pool[poolSize++] = i;
630
631    /* sort the pool from most interesting to least... */
632    if( poolSize > 1 )
633    {
634        uint32_t j;
635        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
636
637        for( j=0; j<poolSize; ++j )
638        {
639            int k;
640            const int piece = pool[j];
641            struct tr_refill_piece * setme = p + j;
642
643            setme->piece = piece;
644            setme->priority = inf->pieces[piece].priority;
645            setme->peerCount = 0;
646            setme->fastAllowed = 0;
647            setme->random = tr_rand( UINT16_MAX );
648            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
649
650            for( k=0; k<peerCount; ++k ) {
651                const tr_peer * peer = peers[k];
652                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
653                    ++setme->peerCount;
654                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
655                    but we're guaranteed to get the same pieces from different peers,
656                    so we'll build a list and pray one actually have this one */
657                setme->fastAllowed = tr_peerMsgsIsPieceFastAllowed( peer->msgs, i );
658                /* Also, if someone SUGGESTed a piece to us, prioritize it over non-suggested others
659                 */
660                setme->suggested   = tr_peerMsgsIsPieceSuggested( peer->msgs, i );
661            }
662        }
663
664        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
665
666        for( j=0; j<poolSize; ++j )
667            pool[j] = p[j].piece;
668
669        tr_free( p );
670    }
671
672    tr_free( peers );
673
674    *pieceCount = poolSize;
675    return pool;
676}
677
678static uint64_t*
679getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
680{
681    int s;
682    uint32_t i;
683    uint32_t pieceCount;
684    uint32_t blockCount;
685    uint32_t unreqCount[3], reqCount[3];
686    uint32_t * pieces;
687    uint64_t * ret, * walk;
688    uint64_t * unreq[3], *req[3];
689    const tr_torrent * tor = t->tor;
690
691    assert( torrentIsLocked( t ) );
692
693    pieces = getPreferredPieces( t, &pieceCount );
694
695    /**
696     * Now we walk through those preferred pieces to find all the blocks
697     * are still missing from them.  We put unrequested blocks first,
698     * of course, but by including requested blocks afterwards, endgame
699     * handling happens naturally.
700     *
701     * By doing this once per priority we also effectively get an endgame
702     * mode for each priority level.  The helps keep high priority files
703     * from getting stuck at 99% due of unresponsive peers.
704     */
705
706    /* make temporary bins for the four tiers of blocks */
707    for( i=0; i<3; ++i ) {
708        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
709        reqCount[i] = 0;
710        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
711        unreqCount[i] = 0;
712    }
713
714    /* sort the blocks into our temp bins */
715    for( i=blockCount=0; i<pieceCount; ++i )
716    {
717        const uint32_t index = pieces[i];
718        const int priorityIndex = tor->info.pieces[index].priority + 1;
719        const int begin = tr_torPieceFirstBlock( tor, index );
720        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
721        int block;
722
723        for( block=begin; block<end; ++block )
724        {
725            if( tr_cpBlockIsComplete( tor->completion, block ) )
726                continue;
727
728            ++blockCount;
729
730            if( tr_bitfieldHas( t->requested, block ) )
731            {
732                const uint32_t n = reqCount[priorityIndex]++;
733                req[priorityIndex][n] = block;
734            }
735            else
736            {
737                const uint32_t n = unreqCount[priorityIndex]++;
738                unreq[priorityIndex][n] = block;
739            }
740        }
741    }
742
743    /* join the bins together, going from highest priority to lowest so
744     * the the blocks we want to request first will be first in the list */
745    ret = walk = tr_new( uint64_t, blockCount );
746    for( s=2; s>=0; --s ) {
747        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
748        walk += unreqCount[s];
749        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
750        walk += reqCount[s];
751    }
752    assert( ( walk - ret ) == ( int )blockCount );
753    *setmeCount = blockCount;
754
755    /* cleanup */
756    tr_free( pieces );
757    for( i=0; i<3; ++i ) {
758        tr_free( unreq[i] );
759        tr_free( req[i] );
760    }
761    return ret;
762}
763
764static int
765refillPulse( void * vtorrent )
766{
767    Torrent * t = vtorrent;
768    tr_torrent * tor = t->tor;
769    uint32_t i;
770    int peerCount;
771    tr_peer ** peers;
772    uint64_t blockCount;
773    uint64_t * blocks;
774
775    if( !t->isRunning )
776        return TRUE;
777    if( tr_torrentIsSeed( t->tor ) )
778        return TRUE;
779
780    torrentLock( t );
781    tordbg( t, "Refilling Request Buffers..." );
782
783    blocks = getPreferredBlocks( t, &blockCount );
784    peers = getConnectedPeers( t, &peerCount );
785
786    for( i=0; peerCount && i<blockCount; ++i )
787    {
788        const uint64_t block = blocks[i];
789        const uint32_t index = tr_torBlockPiece( tor, block );
790        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
791        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
792        int j;
793        assert( _tr_block( tor, index, begin ) == (int)block );
794        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
795        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
796
797        /* find a peer who can ask for this block */
798        for( j=0; j<peerCount; )
799        {
800            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
801            switch( val )
802            {
803                case TR_ADDREQ_FULL: 
804                case TR_ADDREQ_CLIENT_CHOKED:
805                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
806                    break;
807
808                case TR_ADDREQ_MISSING: 
809                case TR_ADDREQ_DUPLICATE: 
810                    ++j;
811                    break;
812
813                case TR_ADDREQ_OK:
814                    tr_bitfieldAdd( t->requested, block );
815                    j = peerCount;
816                    break;
817
818                default:
819                    assert( 0 && "unhandled value" );
820                    break;
821            }
822        }
823    }
824
825    /* cleanup */
826    tr_free( peers );
827    tr_free( blocks );
828
829    t->refillTimer = NULL;
830    torrentUnlock( t );
831    return FALSE;
832}
833
834static void
835broadcastClientHave( Torrent * t, uint32_t index )
836{
837    int i, size;
838    tr_peer ** peers;
839
840    assert( torrentIsLocked( t ) );
841
842    peers = getConnectedPeers( t, &size );
843    for( i=0; i<size; ++i )
844        tr_peerMsgsHave( peers[i]->msgs, index );
845    tr_free( peers );
846}
847
848static void
849broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
850{
851    int i, size;
852    tr_peer ** peers;
853
854    assert( torrentIsLocked( t ) );
855
856    peers = getConnectedPeers( t, &size );
857    for( i=0; i<size; ++i )
858        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
859    tr_free( peers );
860}
861
862static void
863addStrike( Torrent * t, tr_peer * peer )
864{
865    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
866
867    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
868    {
869        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
870        atom->myflags |= MYFLAG_BANNED;
871        peer->doPurge = 1;
872        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
873    }
874}
875
876static void
877msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
878{
879    tr_peer * peer = vpeer;
880    Torrent * t = (Torrent *) vt;
881    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
882
883    torrentLock( t );
884
885    switch( e->eventType )
886    {
887        case TR_PEERMSG_NEED_REQ:
888            if( t->refillTimer == NULL )
889                t->refillTimer = tr_timerNew( t->manager->handle,
890                                              refillPulse, t,
891                                              REFILL_PERIOD_MSEC );
892            break;
893
894        case TR_PEERMSG_CANCEL:
895            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
896            break;
897
898        case TR_PEERMSG_PIECE_DATA: {
899            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
900            atom->piece_data_time = time( NULL );
901            break;
902        }
903
904        case TR_PEERMSG_CLIENT_HAVE:
905            broadcastClientHave( t, e->pieceIndex );
906            tr_torrentRecheckCompleteness( t->tor );
907            break;
908
909        case TR_PEERMSG_PEER_PROGRESS: {
910            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
911            const int peerIsSeed = e->progress >= 1.0;
912            if( peerIsSeed ) {
913                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
914                atom->flags |= ADDED_F_SEED_FLAG;
915            } else {
916                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
917                atom->flags &= ~ADDED_F_SEED_FLAG;
918            } break;
919        }
920
921        case TR_PEERMSG_CLIENT_BLOCK:
922            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
923            break;
924
925        case TR_PEERMSG_GOT_ASSERT_ERROR:
926            addStrike( t, peer );
927            peer->doPurge = 1;
928            break;
929
930        case TR_PEERMSG_ERROR:
931            peer->doPurge = 1;
932            break;
933
934        default:
935            assert(0);
936    }
937
938    torrentUnlock( t );
939}
940
941static void
942ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
943{
944    if( getExistingAtom( t, addr ) == NULL )
945    {
946        struct peer_atom * a;
947        a = tr_new0( struct peer_atom, 1 );
948        a->addr = *addr;
949        a->port = port;
950        a->flags = flags;
951        a->from = from;
952        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
953        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
954    }
955}
956
957static int
958getMaxPeerCount( const tr_torrent * tor UNUSED )
959{
960    return tor->maxConnectedPeers;
961}
962
963/* FIXME: this is kind of a mess. */
964static void
965myHandshakeDoneCB( tr_handshake    * handshake,
966                   tr_peerIo       * io,
967                   int               isConnected,
968                   const uint8_t   * peer_id,
969                   void            * vmanager )
970{
971    int ok = isConnected;
972    uint16_t port;
973    const struct in_addr * addr;
974    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
975    Torrent * t;
976    tr_handshake * ours;
977
978    assert( io != NULL );
979    assert( isConnected==0 || isConnected==1 );
980
981    t = tr_peerIoHasTorrentHash( io )
982        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
983        : NULL;
984
985    if( tr_peerIoIsIncoming ( io ) )
986        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
987                                        handshake, handshakeCompare );
988    else if( t != NULL )
989        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
990                                        handshake, handshakeCompare );
991    else
992        ours = handshake;
993
994    assert( ours != NULL );
995    assert( ours == handshake );
996
997    if( t != NULL )
998        torrentLock( t );
999
1000    addr = tr_peerIoGetAddress( io, &port );
1001
1002    if( !ok || !t || !t->isRunning )
1003    {
1004        if( t ) {
1005            struct peer_atom * atom = getExistingAtom( t, addr );
1006            if( atom )
1007                ++atom->numFails;
1008        }
1009
1010        tr_peerIoFree( io );
1011    }
1012    else /* looking good */
1013    {
1014        struct peer_atom * atom;
1015        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1016        atom = getExistingAtom( t, addr );
1017
1018        if( atom->myflags & MYFLAG_BANNED )
1019        {
1020            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1021            tr_peerIoFree( io );
1022        }
1023        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1024        {
1025            tr_peerIoFree( io );
1026        }
1027        else
1028        {
1029            tr_peer * peer = getExistingPeer( t, addr );
1030
1031            if( peer != NULL ) /* we already have this peer */
1032            {
1033                tr_peerIoFree( io );
1034            }
1035            else
1036            {
1037                peer = getPeer( t, addr );
1038                tr_free( peer->client );
1039                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
1040                peer->port = port;
1041                peer->io = io;
1042                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1043                atom->time = time( NULL );
1044            }
1045        }
1046    }
1047
1048    if( t != NULL )
1049        torrentUnlock( t );
1050}
1051
1052void
1053tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1054                       struct in_addr  * addr,
1055                       uint16_t          port,
1056                       int               socket )
1057{
1058    managerLock( manager );
1059
1060    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1061    {
1062        tr_netClose( socket );
1063    }
1064    else /* we don't have a connetion to them yet... */
1065    {
1066        tr_peerIo * io;
1067        tr_handshake * handshake;
1068
1069        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1070
1071        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1072
1073        handshake = tr_handshakeNew( io,
1074                                     manager->handle->encryptionMode,
1075                                     myHandshakeDoneCB,
1076                                     manager );
1077
1078        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1079    }
1080
1081    managerUnlock( manager );
1082}
1083
1084void
1085tr_peerMgrAddPex( tr_peerMgr     * manager,
1086                  const uint8_t  * torrentHash,
1087                  uint8_t          from,
1088                  const tr_pex   * pex )
1089{
1090    Torrent * t;
1091    managerLock( manager );
1092
1093    t = getExistingTorrent( manager, torrentHash );
1094    ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1095
1096    managerUnlock( manager );
1097}
1098
1099void
1100tr_peerMgrAddPeers( tr_peerMgr    * manager,
1101                    const uint8_t * torrentHash,
1102                    uint8_t         from,
1103                    const uint8_t * peerCompact,
1104                    int             peerCount )
1105{
1106    int i;
1107    const uint8_t * walk = peerCompact;
1108    Torrent * t;
1109
1110    managerLock( manager );
1111
1112    t = getExistingTorrent( manager, torrentHash );
1113    for( i=0; t!=NULL && i<peerCount; ++i )
1114    {
1115        struct in_addr addr;
1116        uint16_t port;
1117        memcpy( &addr, walk, 4 ); walk += 4;
1118        memcpy( &port, walk, 2 ); walk += 2;
1119        ensureAtomExists( t, &addr, port, 0, from );
1120    }
1121
1122    managerUnlock( manager );
1123}
1124
1125/**
1126***
1127**/
1128
1129void
1130tr_peerMgrSetBlame( tr_peerMgr     * manager,
1131                    const uint8_t  * torrentHash,
1132                    int              pieceIndex,
1133                    int              success )
1134{
1135    if( !success )
1136    {
1137        int peerCount, i;
1138        Torrent * t = getExistingTorrent( manager, torrentHash );
1139        tr_peer ** peers;
1140
1141        assert( torrentIsLocked( t ) );
1142
1143        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1144        for( i=0; i<peerCount; ++i )
1145        {
1146            tr_peer * peer = peers[i];
1147            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1148            {
1149                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1150                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1151                           pieceIndex, (int)peer->strikes+1 );
1152                addStrike( t, peer );
1153            }
1154        }
1155    }
1156}
1157
1158int
1159tr_pexCompare( const void * va, const void * vb )
1160{
1161    const tr_pex * a = (const tr_pex *) va;
1162    const tr_pex * b = (const tr_pex *) vb;
1163    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1164    if( i ) return i;
1165    if( a->port < b->port ) return -1;
1166    if( a->port > b->port ) return 1;
1167    return 0;
1168}
1169
1170int tr_pexCompare( const void * a, const void * b );
1171
1172static int
1173peerPrefersCrypto( const tr_peer * peer )
1174{
1175    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1176        return TRUE;
1177
1178    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1179        return FALSE;
1180
1181    return tr_peerIoIsEncrypted( peer->io );
1182};
1183
1184int
1185tr_peerMgrGetPeers( tr_peerMgr      * manager,
1186                    const uint8_t   * torrentHash,
1187                    tr_pex         ** setme_pex )
1188{
1189    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1190    int i, peerCount;
1191    const tr_peer ** peers;
1192    tr_pex * pex;
1193    tr_pex * walk;
1194
1195    torrentLock( (Torrent*)t );
1196
1197    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1198    pex = walk = tr_new( tr_pex, peerCount );
1199
1200    for( i=0; i<peerCount; ++i, ++walk )
1201    {
1202        const tr_peer * peer = peers[i];
1203
1204        walk->in_addr = peer->in_addr;
1205
1206        walk->port = peer->port;
1207
1208        walk->flags = 0;
1209        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1210        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1211    }
1212
1213    assert( ( walk - pex ) == peerCount );
1214    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1215    *setme_pex = pex;
1216
1217    torrentUnlock( (Torrent*)t );
1218
1219    return peerCount;
1220}
1221
1222static int reconnectPulse( void * vtorrent );
1223static int rechokePulse( void * vtorrent );
1224static int swiftPulse( void * vtorrent );
1225
1226void
1227tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1228                        const uint8_t  * torrentHash )
1229{
1230    Torrent * t;
1231
1232    managerLock( manager );
1233
1234    t = getExistingTorrent( manager, torrentHash );
1235
1236    assert( t != NULL );
1237    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1238    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1239    assert( ( t->isRunning != 0 ) == ( t->swiftTimer != NULL ) );
1240
1241    if( !t->isRunning )
1242    {
1243        t->isRunning = 1;
1244
1245        t->reconnectTimer = tr_timerNew( t->manager->handle,
1246                                         reconnectPulse, t,
1247                                         RECONNECT_PERIOD_MSEC );
1248
1249        t->rechokeTimer = tr_timerNew( t->manager->handle,
1250                                       rechokePulse, t,
1251                                       RECHOKE_PERIOD_MSEC );
1252
1253        t->swiftTimer = tr_timerNew( t->manager->handle,
1254                                     swiftPulse, t,
1255                                     SWIFT_PERIOD_MSEC );
1256
1257        reconnectPulse( t );
1258
1259        rechokePulse( t );
1260
1261        swiftPulse( t );
1262    }
1263
1264    managerUnlock( manager );
1265}
1266
1267static void
1268stopTorrent( Torrent * t )
1269{
1270    assert( torrentIsLocked( t ) );
1271
1272    t->isRunning = 0;
1273    tr_timerFree( &t->rechokeTimer );
1274    tr_timerFree( &t->reconnectTimer );
1275    tr_timerFree( &t->swiftTimer );
1276
1277    /* disconnect the peers. */
1278    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1279    tr_ptrArrayClear( t->peers );
1280
1281    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1282     * which removes the handshake from t->outgoingHandshakes... */
1283    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1284        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1285}
1286void
1287tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1288                       const uint8_t  * torrentHash)
1289{
1290    managerLock( manager );
1291
1292    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1293
1294    managerUnlock( manager );
1295}
1296
1297void
1298tr_peerMgrAddTorrent( tr_peerMgr * manager,
1299                      tr_torrent * tor )
1300{
1301    Torrent * t;
1302
1303    managerLock( manager );
1304
1305    assert( tor != NULL );
1306    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1307
1308    t = torrentConstructor( manager, tor );
1309    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1310
1311    managerUnlock( manager );
1312}
1313
1314void
1315tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1316                         const uint8_t  * torrentHash )
1317{
1318    Torrent * t;
1319
1320    managerLock( manager );
1321
1322    t = getExistingTorrent( manager, torrentHash );
1323    assert( t != NULL );
1324    stopTorrent( t );
1325    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1326    torrentDestructor( t );
1327
1328    managerUnlock( manager );
1329}
1330
1331void
1332tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1333                               const uint8_t    * torrentHash,
1334                               int8_t           * tab,
1335                               int                tabCount )
1336{
1337    int i;
1338    const Torrent * t;
1339    const tr_torrent * tor;
1340    float interval;
1341
1342    managerLock( (tr_peerMgr*)manager );
1343
1344    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1345    tor = t->tor;
1346    interval = tor->info.pieceCount / (float)tabCount;
1347
1348    memset( tab, 0, tabCount );
1349
1350    for( i=0; i<tabCount; ++i )
1351    {
1352        const int piece = i * interval;
1353
1354        if( tor == NULL )
1355            tab[i] = 0;
1356        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1357            tab[i] = -1;
1358        else {
1359            int j, peerCount;
1360            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1361            for( j=0; j<peerCount; ++j )
1362                if( tr_bitfieldHas( peers[j]->have, i ) )
1363                    ++tab[i];
1364        }
1365    }
1366
1367    managerUnlock( (tr_peerMgr*)manager );
1368}
1369
1370/* Returns the pieces that we and/or a connected peer has */
1371tr_bitfield*
1372tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1373                        const uint8_t    * torrentHash )
1374{
1375    int i, size;
1376    const Torrent * t;
1377    const tr_peer ** peers;
1378    tr_bitfield * pieces;
1379
1380    managerLock( (tr_peerMgr*)manager );
1381
1382    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1383    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1384    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1385    for( i=0; i<size; ++i )
1386        if( peers[i]->io != NULL )
1387            tr_bitfieldOr( pieces, peers[i]->have );
1388
1389    managerUnlock( (tr_peerMgr*)manager );
1390    return pieces;
1391}
1392
1393int
1394tr_peerMgrHasConnections( const tr_peerMgr * manager,
1395                          const uint8_t    * torrentHash )
1396{
1397    int ret;
1398    const Torrent * t;
1399    managerLock( (tr_peerMgr*)manager );
1400
1401    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1402    ret = t && tr_ptrArraySize( t->peers );
1403
1404    managerUnlock( (tr_peerMgr*)manager );
1405    return ret;
1406}
1407
1408static int
1409clientIsDownloadingFrom( const tr_peer * peer )
1410{
1411    return peer->clientIsInterested && !peer->clientIsChoked;
1412}
1413
1414static int
1415clientIsUploadingTo( const tr_peer * peer )
1416{
1417    return peer->peerIsInterested && !peer->peerIsChoked;
1418}
1419
1420void
1421tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1422                        const uint8_t    * torrentHash,
1423                        int              * setmePeersKnown,
1424                        int              * setmePeersConnected,
1425                        int              * setmePeersSendingToUs,
1426                        int              * setmePeersGettingFromUs,
1427                        int              * setmePeersFrom )
1428{
1429    int i, size;
1430    const Torrent * t;
1431    const tr_peer ** peers;
1432
1433    managerLock( (tr_peerMgr*)manager );
1434
1435    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1436    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1437
1438    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1439    *setmePeersConnected      = 0;
1440    *setmePeersSendingToUs    = 0;
1441    *setmePeersGettingFromUs  = 0;
1442
1443    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1444        setmePeersFrom[i] = 0;
1445
1446    for( i=0; i<size; ++i )
1447    {
1448        const tr_peer * peer = peers[i];
1449        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1450
1451        if( peer->io == NULL ) /* not connected */
1452            continue;
1453
1454        ++*setmePeersConnected;
1455
1456        ++setmePeersFrom[atom->from];
1457
1458        if( clientIsUploadingTo( peer ) )
1459            ++*setmePeersGettingFromUs;
1460
1461        if( clientIsDownloadingFrom( peer ) )
1462            ++*setmePeersSendingToUs;
1463    }
1464
1465    managerUnlock( (tr_peerMgr*)manager );
1466}
1467
1468struct tr_peer_stat *
1469tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1470                     const uint8_t     * torrentHash,
1471                     int               * setmeCount UNUSED )
1472{
1473    int i, size;
1474    const Torrent * t;
1475    tr_peer ** peers;
1476    tr_peer_stat * ret;
1477
1478    assert( manager != NULL );
1479    managerLock( (tr_peerMgr*)manager );
1480
1481    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1482    peers = getConnectedPeers( (Torrent*)t, &size );
1483    ret = tr_new0( tr_peer_stat, size );
1484
1485    for( i=0; i<size; ++i )
1486    {
1487        const tr_peer * peer = peers[i];
1488        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1489        tr_peer_stat * stat = ret + i;
1490
1491        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1492        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1493        stat->port             = peer->port;
1494        stat->from             = atom->from;
1495        stat->progress         = peer->progress;
1496        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1497        stat->uploadToRate     = peer->rateToPeer;
1498        stat->downloadFromRate = peer->rateToClient;
1499        stat->isDownloading    = clientIsDownloadingFrom( peer );
1500        stat->isUploading      = clientIsUploadingTo( peer );
1501        stat->status           = peer->status;
1502    }
1503
1504    *setmeCount = size;
1505    tr_free( peers );
1506
1507    managerUnlock( (tr_peerMgr*)manager );
1508    return ret;
1509}
1510
1511/**
1512***
1513**/
1514
1515struct ChokeData
1516{
1517    uint8_t doUnchoke;
1518    uint8_t isInterested;
1519    uint32_t rate;
1520    tr_peer * peer;
1521};
1522
1523static int
1524compareChoke( const void * va, const void * vb )
1525{
1526    const struct ChokeData * a = va;
1527    const struct ChokeData * b = vb;
1528    return -tr_compareUint32( a->rate, b->rate );
1529}
1530
1531static int
1532isNew( const tr_peer * peer )
1533{
1534    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1535}
1536
1537static int
1538isSame( const tr_peer * peer )
1539{
1540    return peer && peer->client && strstr( peer->client, "Transmission" );
1541}
1542
1543/**
1544***
1545**/
1546
1547static double
1548getWeightedRate( const tr_peer * peer, int clientIsSeed )
1549{
1550    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1551                                        : peer->rateToClient ) );
1552}
1553
1554static void
1555rechoke( Torrent * t )
1556{
1557    int i, n, peerCount, size, unchokedInterested;
1558    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1559    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1560    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1561
1562    assert( torrentIsLocked( t ) );
1563   
1564    /* sort the peers by preference and rate */
1565    for( i=0, size=0; i<peerCount; ++i )
1566    {
1567        tr_peer * peer = peers[i];
1568        if( peer->progress >= 1.0 ) /* choke all seeds */
1569            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1570        else {
1571            struct ChokeData * node = &choke[size++];
1572            node->peer = peer;
1573            node->isInterested = peer->peerIsInterested;
1574            node->rate = getWeightedRate( peer, clientIsSeed );
1575        }
1576    }
1577
1578    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1579
1580    /**
1581     * Reciprocation and number of uploads capping is managed by unchoking
1582     * the N peers which have the best upload rate and are interested.
1583     * This maximizes the client's download rate. These N peers are
1584     * referred to as downloaders, because they are interested in downloading
1585     * from the client.
1586     *
1587     * Peers which have a better upload rate (as compared to the downloaders)
1588     * but aren't interested get unchoked. If they become interested, the
1589     * downloader with the worst upload rate gets choked. If a client has
1590     * a complete file, it uses its upload rate rather than its download
1591     * rate to decide which peers to unchoke.
1592     */
1593    unchokedInterested = 0;
1594    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1595        choke[i].doUnchoke = 1;
1596        if( choke[i].isInterested )
1597            ++unchokedInterested;
1598    }
1599    n = i;
1600    while( i<size )
1601        choke[i++].doUnchoke = 0;
1602
1603    /* optimistic unchoke */
1604    if( i < size )
1605    {
1606        struct ChokeData * c;
1607        tr_ptrArray * randPool = tr_ptrArrayNew( );
1608        for( ; i<size; ++i )
1609        {
1610            const tr_peer * peer = choke[i].peer;
1611            int x=1, y;
1612            if( isNew( peer ) ) x *= 3;
1613            if( isSame( peer ) ) x *= 3;
1614            for( y=0; y<x; ++y )
1615                tr_ptrArrayAppend( randPool, choke );
1616        }
1617        i = tr_rand( tr_ptrArraySize( randPool ) );
1618        c = ( struct ChokeData* )tr_ptrArrayNth( randPool, i);
1619        c->doUnchoke = 1;
1620        t->optimistic = c->peer;
1621        tr_ptrArrayFree( randPool, NULL );
1622    }
1623
1624    for( i=0; i<size; ++i )
1625        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1626
1627    /* cleanup */
1628    tr_free( choke );
1629    tr_free( peers );
1630}
1631
1632static int
1633rechokePulse( void * vtorrent )
1634{
1635    Torrent * t = vtorrent;
1636    torrentLock( t );
1637    rechoke( t );
1638    torrentUnlock( t );
1639    return TRUE;
1640}
1641
1642/***
1643****
1644***/
1645
1646static int
1647swiftPulse( void * vtorrent )
1648{
1649    Torrent * t = vtorrent;
1650    torrentLock( t );
1651
1652    if( !tr_torrentIsSeed( t->tor ) )
1653    {
1654        int i;
1655        int peerCount = 0;
1656        int deadbeatCount = 0;
1657        tr_peer ** peers = getConnectedPeers( t, &peerCount );
1658        tr_peer ** deadbeats = tr_new( tr_peer*, peerCount );
1659
1660        const double ul_KiBsec = tr_rcRate( t->tor->upload );
1661        const double ul_KiB = ul_KiBsec * (SWIFT_PERIOD_MSEC/1000.0);
1662        const double ul_bytes = ul_KiB * 1024;
1663        const double freeCreditTotal = ul_bytes * SWIFT_LARGESSE;
1664        int freeCreditPerPeer;
1665
1666        for( i=0; i<peerCount; ++i ) {
1667            tr_peer * peer = peers[i];
1668            if( peer->credit <= 0 )
1669                deadbeats[deadbeatCount++] =  peer;
1670        }
1671
1672        freeCreditPerPeer = (int)( freeCreditTotal / deadbeatCount );
1673        for( i=0; i<deadbeatCount; ++i )
1674            deadbeats[i]->credit = freeCreditPerPeer;
1675
1676        tordbg( t, "%d deadbeats, "
1677            "who are each being granted %d bytes' credit "
1678            "for a total of %.1f KiB, "
1679            "%d%% of the torrent's ul speed %.1f\n",
1680            deadbeatCount, freeCreditPerPeer,
1681            ul_KiBsec*SWIFT_LARGESSE, (int)(SWIFT_LARGESSE*100), ul_KiBsec );
1682
1683        tr_free( deadbeats );
1684        tr_free( peers );
1685    }
1686
1687    torrentUnlock( t );
1688    return TRUE;
1689}
1690
1691/***
1692****
1693****  Life and Death
1694****
1695***/
1696
1697static int
1698shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1699{
1700    const tr_torrent * tor = t->tor;
1701    const time_t now = time( NULL );
1702    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1703
1704    /* if it's marked for purging, close it */
1705    if( peer->doPurge ) {
1706        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1707        return TRUE;
1708    }
1709
1710    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1711    if( 1 ) {
1712        const int clientIsSeed = tr_torrentIsSeed( tor );
1713        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1714        if( peerIsSeed && clientIsSeed && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1715            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1716            return TRUE;
1717        }
1718    }
1719
1720    /* disconnect if it's been too long since piece data has been transferred.
1721     * this is on a sliding scale based on number of available peers... */
1722    if( 1 ) {
1723        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1724        /* if we have >= relaxIfFewerThan, strictness is 100%.
1725         * if we have zero connections, strictness is 0% */
1726        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1727            ? 1.0
1728            : peerCount / (double)relaxStrictnessIfFewerThanN;
1729        const int lo = MIN_UPLOAD_IDLE_SECS;
1730        const int hi = MAX_UPLOAD_IDLE_SECS;
1731        const int limit = lo + ((hi-lo) * strictness);
1732        const time_t then = peer->pieceDataActivityDate;
1733        const int idleTime = then ? (now-then) : 0;
1734        if( idleTime > limit ) {
1735            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1736                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1737            return TRUE;
1738        }
1739    }
1740
1741    return FALSE;
1742}
1743
1744static tr_peer **
1745getPeersToClose( Torrent * t, int * setmeSize )
1746{
1747    int i, peerCount, outsize;
1748    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1749    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1750
1751    assert( torrentIsLocked( t ) );
1752
1753    for( i=outsize=0; i<peerCount; ++i )
1754        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1755            ret[outsize++] = peers[i];
1756
1757    *setmeSize = outsize;
1758    return ret;
1759}
1760
1761static int
1762compareCandidates( const void * va, const void * vb )
1763{
1764    const struct peer_atom * a = * (const struct peer_atom**) va;
1765    const struct peer_atom * b = * (const struct peer_atom**) vb;
1766    int i;
1767
1768    if( a->piece_data_time > b->piece_data_time ) return -1;
1769    if( a->piece_data_time < b->piece_data_time ) return  1;
1770
1771    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1772        return i;
1773
1774    if( a->time != b->time )
1775        return a->time < b->time ? -1 : 1;
1776
1777    return 0;
1778}
1779
1780static struct peer_atom **
1781getPeerCandidates( Torrent * t, int * setmeSize )
1782{
1783    int i, atomCount, retCount;
1784    struct peer_atom ** atoms;
1785    struct peer_atom ** ret;
1786    const time_t now = time( NULL );
1787    const int seed = tr_torrentIsSeed( t->tor );
1788
1789    assert( torrentIsLocked( t ) );
1790
1791    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1792    ret = tr_new( struct peer_atom*, atomCount );
1793    for( i=retCount=0; i<atomCount; ++i )
1794    {
1795        struct peer_atom * atom = atoms[i];
1796
1797        /* peer fed us too much bad data ... we only keep it around
1798         * now to weed it out in case someone sends it to us via pex */
1799        if( atom->myflags & MYFLAG_BANNED )
1800            continue;
1801
1802        /* peer was unconnectable before, so we're not going to keep trying.
1803         * this is needs a separate flag from `banned', since if they try
1804         * to connect to us later, we'll let them in */
1805        if( atom->myflags & MYFLAG_UNREACHABLE )
1806            continue;
1807
1808        /* we don't need two connections to the same peer... */
1809        if( peerIsInUse( t, &atom->addr ) )
1810            continue;
1811
1812        /* no need to connect if we're both seeds... */
1813        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1814            continue;
1815
1816        /* we're wasting our time trying to connect to this bozo. */
1817        if( atom->numFails > 3 )
1818            continue;
1819
1820        /* If we were connected to this peer recently and transferring
1821         * piece data, try to reconnect -- network troubles may have
1822         * disconnected us.  but if we weren't sharing piece data,
1823         * hold off on this peer to give another one a try instead */
1824        if( ( now - atom->piece_data_time ) > 30 )
1825        {
1826            int minWait = (60 * 10); /* ten minutes */
1827            int maxWait = (60 * 30); /* thirty minutes */
1828            int wait = atom->numFails * minWait;
1829            if( wait < minWait ) wait = minWait;
1830            if( wait > maxWait ) wait = maxWait;
1831            if( ( now - atom->time ) < wait ) {
1832                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1833                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1834                continue;
1835            }
1836        }
1837
1838        ret[retCount++] = atom;
1839    }
1840
1841    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1842    *setmeSize = retCount;
1843    return ret;
1844}
1845
1846static int
1847reconnectPulse( void * vtorrent )
1848{
1849    Torrent * t = vtorrent;
1850    static time_t prevTime = 0;
1851    static int newConnectionsThisSecond = 0;
1852    time_t now;
1853
1854    torrentLock( t );
1855
1856    now = time( NULL );
1857    if( prevTime != now )
1858    {
1859        prevTime = now;
1860        newConnectionsThisSecond = 0;
1861    }
1862
1863    if( !t->isRunning )
1864    {
1865        removeAllPeers( t );
1866    }
1867    else
1868    {
1869        int i, nCandidates, nBad;
1870        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1871        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1872
1873        if( nBad || nCandidates )
1874            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1875                       "%d connection candidates, %d atoms, max per pulse is %d",
1876                       t->tor->info.name, nBad, nCandidates,
1877                       tr_ptrArraySize(t->pool),
1878                       (int)MAX_RECONNECTIONS_PER_PULSE );
1879
1880        /* disconnect some peers.
1881           if we got transferred piece data, then they might be good peers,
1882           so reset their `numFails' weight to zero.  otherwise we connected
1883           to them fruitlessly, so mark it as another fail */
1884        for( i=0; i<nBad; ++i ) {
1885            tr_peer * peer = connections[i];
1886            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1887            if( peer->pieceDataActivityDate )
1888                atom->numFails = 0;
1889            else
1890                ++atom->numFails;
1891            removePeer( t, peer );
1892        }
1893
1894        /* add some new ones */
1895        for( i=0;    i < nCandidates
1896                  && i < MAX_RECONNECTIONS_PER_PULSE
1897                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1898        {
1899            tr_peerMgr * mgr = t->manager;
1900            struct peer_atom * atom = candidates[i];
1901            tr_peerIo * io;
1902
1903            tordbg( t, "Starting an OUTGOING connection with %s",
1904                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1905
1906            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1907            if( io == NULL )
1908            {
1909                atom->myflags |= MYFLAG_UNREACHABLE;
1910            }
1911            else
1912            {
1913                tr_handshake * handshake = tr_handshakeNew( io,
1914                                                            mgr->handle->encryptionMode,
1915                                                            myHandshakeDoneCB,
1916                                                            mgr );
1917
1918                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1919
1920                ++newConnectionsThisSecond;
1921
1922                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1923            }
1924
1925            atom->time = time( NULL );
1926        }
1927
1928        /* cleanup */
1929        tr_free( connections );
1930        tr_free( candidates );
1931    }
1932
1933    torrentUnlock( t );
1934    return TRUE;
1935}
Note: See TracBrowser for help on using the repository browser.