source: branches/1.0x/libtransmission/peer-mgr.c @ 4854

Last change on this file since 4854 was 4854, checked in by charles, 15 years ago

(1.0x) remove unused "max unchoked peers" feature. fix obsolete TR_FLAG_SAVE comment. This was r4813 in trunk.

  • Property svn:keywords set to Date Rev Author Id
File size: 54.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 4854 2008-01-28 17:53:32Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "platform.h"
34#include "ptrarray.h"
35#include "ratecontrol.h"
36#include "shared.h"
37#include "torrent.h"
38#include "trcompat.h" /* strlcpy */
39#include "trevent.h"
40#include "utils.h"
41
42/**
43*** The "SWIFT" system is described by Karthik Tamilmani,
44*** Vinay Pai, and Alexander Mohr of Stony Brook University
45*** in their paper "SWIFT: A System With Incentives For Trading"
46*** http://citeseer.ist.psu.edu/tamilmani04swift.html
47***
48*** More SWIFT constants are defined in peer-mgr-private.h
49**/
50
51/**
52 * Allow new peers to download this many bytes from
53 * us when getting started.  This can prevent gridlock
54 * with other peers using tit-for-tat algorithms
55 */
56static const int SWIFT_INITIAL_CREDIT = 64 * 1024; /* 64 KiB */
57
58/**
59 * We expend a fraction of our torrent's total upload speed
60 * on largesse by uniformly distributing free credit to
61 * all of our peers.  This too helps prevent gridlock.
62 */
63static const double SWIFT_LARGESSE = 0.15; /* 15% of our UL */
64
65/**
66 * How frequently to extend largesse-based credit
67 */
68static const int SWIFT_PERIOD_MSEC = 5000;
69
70
71enum
72{
73    /* how frequently to change which peers are choked */
74    RECHOKE_PERIOD_MSEC = (10 * 1000),
75
76    /* how frequently to refill peers' request lists */
77    REFILL_PERIOD_MSEC = 666,
78
79    /* following the BT spec, we consider ourselves `snubbed' if
80     * we're we don't get piece data from a peer in this long */
81    SNUBBED_SEC = 60,
82
83    /* when many peers are available, keep idle ones this long */
84    MIN_UPLOAD_IDLE_SECS = (60 * 3),
85
86    /* when few peers are available, keep idle ones this long */
87    MAX_UPLOAD_IDLE_SECS = (60 * 10),
88
89    /* how frequently to decide which peers live and die */
90    RECONNECT_PERIOD_MSEC = (2 * 1000),
91
92    /* max # of peers to ask fer per torrent per reconnect pulse */
93    MAX_RECONNECTIONS_PER_PULSE = 1,
94
95    /* max number of peers to ask for per second overall.
96     * this throttle is to avoid overloading the router */
97    MAX_CONNECTIONS_PER_SECOND = 8,
98
99    /* number of unchoked peers per torrent */
100    MAX_UNCHOKED_PEERS = 12,
101
102    /* corresponds to ut_pex's added.f flags */
103    ADDED_F_ENCRYPTION_FLAG = 1,
104
105    /* corresponds to ut_pex's added.f flags */
106    ADDED_F_SEED_FLAG = 2,
107
108    /* number of bad pieces a peer is allowed to send before we ban them */
109    MAX_BAD_PIECES_PER_PEER = 3,
110
111    /* use for bitwise operations w/peer_atom.myflags */
112    MYFLAG_BANNED = 1,
113
114    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
115    MYFLAG_UNREACHABLE = 2
116};
117
118
119/**
120***
121**/
122
123/* We keep one of these for every peer we know about, whether
124 * it's connected or not, so the struct must be small.
125 * When our current connections underperform, we dip back
126 * into this list for new ones. */
127struct peer_atom
128{   
129    uint8_t from;
130    uint8_t flags; /* these match the added_f flags */
131    uint8_t myflags; /* flags that aren't defined in added_f */
132    uint16_t port;
133    uint16_t numFails;
134    struct in_addr addr;
135    time_t time;
136    time_t piece_data_time;
137};
138
139typedef struct
140{
141    uint8_t hash[SHA_DIGEST_LENGTH];
142    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
143    tr_ptrArray * pool; /* struct peer_atom */
144    tr_ptrArray * peers; /* tr_peer */
145    tr_timer * reconnectTimer;
146    tr_timer * rechokeTimer;
147    tr_timer * refillTimer;
148    tr_timer * swiftTimer;
149    tr_torrent * tor;
150    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
151    tr_bitfield * requested;
152
153    unsigned int isRunning : 1;
154
155    struct tr_peerMgr * manager;
156}
157Torrent;
158
159struct tr_peerMgr
160{
161    tr_handle * handle;
162    tr_ptrArray * torrents; /* Torrent */
163    tr_ptrArray * incomingHandshakes; /* tr_handshake */
164};
165
166/**
167***
168**/
169
170static void
171myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
172{
173    FILE * fp = tr_getLog( );
174    if( fp != NULL )
175    {
176        va_list args;
177        char timestr[64];
178        struct evbuffer * buf = evbuffer_new( );
179        char * myfile = tr_strdup( file );
180
181        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
182        if( t != NULL )
183            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
184        va_start( args, fmt );
185        evbuffer_add_vprintf( buf, fmt, args );
186        va_end( args );
187        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
188        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
189
190        tr_free( myfile );
191        evbuffer_free( buf );
192    }
193}
194
195#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
196
197/**
198***
199**/
200
201static void
202managerLock( struct tr_peerMgr * manager )
203{
204    tr_globalLock( manager->handle );
205}
206static void
207managerUnlock( struct tr_peerMgr * manager )
208{
209    tr_globalUnlock( manager->handle );
210}
211static void
212torrentLock( Torrent * torrent )
213{
214    managerLock( torrent->manager );
215}
216static void
217torrentUnlock( Torrent * torrent )
218{
219    managerUnlock( torrent->manager );
220}
221static int
222torrentIsLocked( const Torrent * t )
223{
224    return tr_globalIsLocked( t->manager->handle );
225}
226
227/**
228***
229**/
230
231static int
232compareAddresses( const struct in_addr * a, const struct in_addr * b )
233{
234    return tr_compareUint32( a->s_addr, b->s_addr );
235}
236
237static int
238handshakeCompareToAddr( const void * va, const void * vb )
239{
240    const tr_handshake * a = va;
241    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
242}
243
244static int
245handshakeCompare( const void * a, const void * b )
246{
247    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
248}
249
250static tr_handshake*
251getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
252{
253    return tr_ptrArrayFindSorted( handshakes,
254                                  in_addr,
255                                  handshakeCompareToAddr );
256}
257
258static int
259comparePeerAtomToAddress( const void * va, const void * vb )
260{
261    const struct peer_atom * a = va;
262    return compareAddresses( &a->addr, vb );
263}
264
265static int
266comparePeerAtoms( const void * va, const void * vb )
267{
268    const struct peer_atom * b = vb;
269    return comparePeerAtomToAddress( va, &b->addr );
270}
271
272/**
273***
274**/
275
276static int
277torrentCompare( const void * va, const void * vb )
278{
279    const Torrent * a = va;
280    const Torrent * b = vb;
281    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
282}
283
284static int
285torrentCompareToHash( const void * va, const void * vb )
286{
287    const Torrent * a = va;
288    const uint8_t * b_hash = vb;
289    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
290}
291
292static Torrent*
293getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
294{
295    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
296                                             hash,
297                                             torrentCompareToHash );
298}
299
300static int
301peerCompare( const void * va, const void * vb )
302{
303    const tr_peer * a = va;
304    const tr_peer * b = vb;
305    return compareAddresses( &a->in_addr, &b->in_addr );
306}
307
308static int
309peerCompareToAddr( const void * va, const void * vb )
310{
311    const tr_peer * a = va;
312    return compareAddresses( &a->in_addr, vb );
313}
314
315static tr_peer*
316getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
317{
318    assert( torrentIsLocked( torrent ) );
319    assert( in_addr != NULL );
320
321    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
322                                             in_addr,
323                                             peerCompareToAddr );
324}
325
326static struct peer_atom*
327getExistingAtom( const Torrent * t, const struct in_addr * addr )
328{
329    assert( torrentIsLocked( t ) );
330    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
331}
332
333static int
334peerIsInUse( const Torrent * ct, const struct in_addr * addr )
335{
336    Torrent * t = (Torrent*) ct;
337
338    assert( torrentIsLocked ( t ) );
339
340    return getExistingPeer( t, addr )
341        || getExistingHandshake( t->outgoingHandshakes, addr )
342        || getExistingHandshake( t->manager->incomingHandshakes, addr );
343}
344
345static tr_peer*
346peerConstructor( const struct in_addr * in_addr )
347{
348    tr_peer * p;
349    p = tr_new0( tr_peer, 1 );
350    p->credit = SWIFT_INITIAL_CREDIT;
351    p->rcToClient = tr_rcInit( );
352    p->rcToPeer = tr_rcInit( );
353    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
354    return p;
355}
356
357static tr_peer*
358getPeer( Torrent * torrent, const struct in_addr * in_addr )
359{
360    tr_peer * peer;
361
362    assert( torrentIsLocked( torrent ) );
363
364    peer = getExistingPeer( torrent, in_addr );
365
366    if( peer == NULL ) {
367        peer = peerConstructor( in_addr );
368        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
369    }
370
371    return peer;
372}
373
374static void
375peerDestructor( tr_peer * peer )
376{
377    assert( peer != NULL );
378    assert( peer->msgs != NULL );
379
380    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
381    tr_peerMsgsFree( peer->msgs );
382
383    tr_peerIoFree( peer->io );
384
385    tr_bitfieldFree( peer->have );
386    tr_bitfieldFree( peer->blame );
387    tr_rcClose( peer->rcToClient );
388    tr_rcClose( peer->rcToPeer );
389    tr_free( peer->client );
390    tr_free( peer );
391}
392
393static void
394removePeer( Torrent * t, tr_peer * peer )
395{
396    tr_peer * removed;
397    struct peer_atom * atom;
398
399    assert( torrentIsLocked( t ) );
400
401    atom = getExistingAtom( t, &peer->in_addr );
402    assert( atom != NULL );
403    atom->time = time( NULL );
404
405    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
406    assert( removed == peer );
407    peerDestructor( removed );
408}
409
410static void
411removeAllPeers( Torrent * t )
412{
413    while( !tr_ptrArrayEmpty( t->peers ) )
414        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
415}
416
417static void
418torrentDestructor( Torrent * t )
419{
420    uint8_t hash[SHA_DIGEST_LENGTH];
421
422    assert( t != NULL );
423    assert( !t->isRunning );
424    assert( t->peers != NULL );
425    assert( torrentIsLocked( t ) );
426    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
427    assert( tr_ptrArrayEmpty( t->peers ) );
428
429    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
430
431    tr_timerFree( &t->reconnectTimer );
432    tr_timerFree( &t->rechokeTimer );
433    tr_timerFree( &t->refillTimer );
434    tr_timerFree( &t->swiftTimer );
435
436    tr_bitfieldFree( t->requested );
437    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
438    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
439    tr_ptrArrayFree( t->peers, NULL );
440
441    tr_free( t );
442}
443
444static Torrent*
445torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
446{
447    Torrent * t;
448
449    t = tr_new0( Torrent, 1 );
450    t->manager = manager;
451    t->tor = tor;
452    t->pool = tr_ptrArrayNew( );
453    t->peers = tr_ptrArrayNew( );
454    t->outgoingHandshakes = tr_ptrArrayNew( );
455    t->requested = tr_bitfieldNew( tor->blockCount );
456    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
457
458    return t;
459}
460
461/**
462 * For explanation, see http://www.bittorrent.org/fast_extensions.html
463 * Also see the "test-allowed-set" unit test
464 */
465struct tr_bitfield *
466tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
467                              const uint32_t         sz,        /* number of pieces in torrent */
468                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
469                              const struct in_addr * ip )       /* peer's address */
470{
471    uint8_t w[SHA_DIGEST_LENGTH + 4];
472    uint8_t x[SHA_DIGEST_LENGTH];
473    tr_bitfield_t * a;
474    uint32_t a_size;
475
476    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
477    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
478    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
479
480    a = tr_bitfieldNew( sz );
481    a_size = 0;
482   
483    while( a_size < k )
484    {
485        int i;
486        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
487        {
488            uint32_t j = i * 4;                                /* (5) */
489            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
490            uint32_t index = y % sz;                           /* (7) */
491            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
492                tr_bitfieldAdd( a, index );                    /* (9) */
493                ++a_size;
494            }
495        }
496        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
497    }
498   
499    return a;
500}
501
502tr_peerMgr*
503tr_peerMgrNew( tr_handle * handle )
504{
505    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
506    m->handle = handle;
507    m->torrents = tr_ptrArrayNew( );
508    m->incomingHandshakes = tr_ptrArrayNew( );
509    return m;
510}
511
512void
513tr_peerMgrFree( tr_peerMgr * manager )
514{
515    managerLock( manager );
516
517    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
518     * the item from manager->handshakes, so this is a little roundabout... */
519    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
520        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
521    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
522
523    /* free the torrents. */
524    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
525
526    managerUnlock( manager );
527    tr_free( manager );
528}
529
530static tr_peer**
531getConnectedPeers( Torrent * t, int * setmeCount )
532{
533    int i, peerCount, connectionCount;
534    tr_peer **peers;
535    tr_peer **ret;
536
537    assert( torrentIsLocked( t ) );
538
539    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
540    ret = tr_new( tr_peer*, peerCount );
541
542    for( i=connectionCount=0; i<peerCount; ++i )
543        if( peers[i]->msgs != NULL )
544            ret[connectionCount++] = peers[i];
545
546    *setmeCount = connectionCount;
547    return ret;
548}
549
550/***
551****  Refill
552***/
553
554struct tr_refill_piece
555{
556    tr_priority_t priority;
557    int percentDone;
558    uint16_t random;
559    uint32_t piece;
560    uint32_t peerCount;
561    uint32_t fastAllowed;
562    uint32_t suggested;
563};
564
565static int
566compareRefillPiece (const void * aIn, const void * bIn)
567{
568    const struct tr_refill_piece * a = aIn;
569    const struct tr_refill_piece * b = bIn;
570   
571    /* if one piece has a higher priority, it goes first */
572    if( a->priority != b->priority )
573        return a->priority > b->priority ? -1 : 1;
574
575    /* try to fill partial pieces */
576    if( a->percentDone != b->percentDone )
577        return a->percentDone > b->percentDone ? -1 : 1;
578   
579    /* if one *might be* fastallowed to us, get it first...
580     * I'm putting it on top so we prioritize those pieces at
581     * startup, then we'll have them, and we'll be denied access
582     * to them */
583    if (a->fastAllowed != b->fastAllowed)
584        return a->fastAllowed < b->fastAllowed ? -1 : 1;
585   
586    /* otherwise if one was suggested to us, get it */
587    if (a->suggested != b->suggested)
588        return a->suggested < b->suggested ? -1 : 1;
589   
590    /* otherwise if one has fewer peers, it goes first */
591    if (a->peerCount != b->peerCount)
592        return a->peerCount < b->peerCount ? -1 : 1;
593
594    /* otherwise go with our random seed */
595    return tr_compareUint16( a->random, b->random );
596}
597
598static int
599isPieceInteresting( const tr_torrent  * tor,
600                    int                 piece )
601{
602    if( tor->info.pieces[piece].dnd ) /* we don't want it */
603        return 0;
604
605    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
606        return 0;
607
608    return 1;
609}
610
611static uint32_t*
612getPreferredPieces( Torrent     * t,
613                    uint32_t    * pieceCount )
614{
615    const tr_torrent * tor = t->tor;
616    const tr_info * inf = &tor->info;
617    int i;
618    uint32_t poolSize = 0;
619    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
620    int peerCount;
621    tr_peer** peers;
622
623    assert( torrentIsLocked( t ) );
624
625    peers = getConnectedPeers( t, &peerCount );
626
627    for( i=0; i<inf->pieceCount; ++i )
628        if( isPieceInteresting( tor, i ) )
629            pool[poolSize++] = i;
630
631    /* sort the pool from most interesting to least... */
632    if( poolSize > 1 )
633    {
634        uint32_t j;
635        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
636
637        for( j=0; j<poolSize; ++j )
638        {
639            int k;
640            const int piece = pool[j];
641            struct tr_refill_piece * setme = p + j;
642
643            setme->piece = piece;
644            setme->priority = inf->pieces[piece].priority;
645            setme->peerCount = 0;
646            setme->fastAllowed = 0;
647            setme->random = tr_rand( UINT16_MAX );
648            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
649
650            for( k=0; k<peerCount; ++k ) {
651                const tr_peer * peer = peers[k];
652                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
653                    ++setme->peerCount;
654                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
655                    but we're guaranteed to get the same pieces from different peers,
656                    so we'll build a list and pray one actually have this one */
657                setme->fastAllowed = tr_peerMsgsIsPieceFastAllowed( peer->msgs, i );
658                /* Also, if someone SUGGESTed a piece to us, prioritize it over non-suggested others
659                 */
660                setme->suggested   = tr_peerMsgsIsPieceSuggested( peer->msgs, i );
661            }
662        }
663
664        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
665
666        for( j=0; j<poolSize; ++j )
667            pool[j] = p[j].piece;
668
669        tr_free( p );
670    }
671
672    tr_free( peers );
673
674    *pieceCount = poolSize;
675    return pool;
676}
677
678static uint64_t*
679getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
680{
681    int s;
682    uint32_t i;
683    uint32_t pieceCount;
684    uint32_t blockCount;
685    uint32_t unreqCount[3], reqCount[3];
686    uint32_t * pieces;
687    uint64_t * ret, * walk;
688    uint64_t * unreq[3], *req[3];
689    const tr_torrent * tor = t->tor;
690
691    assert( torrentIsLocked( t ) );
692
693    pieces = getPreferredPieces( t, &pieceCount );
694
695    /**
696     * Now we walk through those preferred pieces to find all the blocks
697     * are still missing from them.  We put unrequested blocks first,
698     * of course, but by including requested blocks afterwards, endgame
699     * handling happens naturally.
700     *
701     * By doing this once per priority we also effectively get an endgame
702     * mode for each priority level.  The helps keep high priority files
703     * from getting stuck at 99% due of unresponsive peers.
704     */
705
706    /* make temporary bins for the four tiers of blocks */
707    for( i=0; i<3; ++i ) {
708        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
709        reqCount[i] = 0;
710        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
711        unreqCount[i] = 0;
712    }
713
714    /* sort the blocks into our temp bins */
715    for( i=blockCount=0; i<pieceCount; ++i )
716    {
717        const uint32_t index = pieces[i];
718        const int priorityIndex = tor->info.pieces[index].priority + 1;
719        const int begin = tr_torPieceFirstBlock( tor, index );
720        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
721        int block;
722
723        for( block=begin; block<end; ++block )
724        {
725            if( tr_cpBlockIsComplete( tor->completion, block ) )
726                continue;
727
728            ++blockCount;
729
730            if( tr_bitfieldHas( t->requested, block ) )
731            {
732                const uint32_t n = reqCount[priorityIndex]++;
733                req[priorityIndex][n] = block;
734            }
735            else
736            {
737                const uint32_t n = unreqCount[priorityIndex]++;
738                unreq[priorityIndex][n] = block;
739            }
740        }
741    }
742
743    /* join the bins together, going from highest priority to lowest so
744     * the the blocks we want to request first will be first in the list */
745    ret = walk = tr_new( uint64_t, blockCount );
746    for( s=2; s>=0; --s ) {
747        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
748        walk += unreqCount[s];
749        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
750        walk += reqCount[s];
751    }
752    assert( ( walk - ret ) == ( int )blockCount );
753    *setmeCount = blockCount;
754
755    /* cleanup */
756    tr_free( pieces );
757    for( i=0; i<3; ++i ) {
758        tr_free( unreq[i] );
759        tr_free( req[i] );
760    }
761    return ret;
762}
763
764static int
765refillPulse( void * vtorrent )
766{
767    Torrent * t = vtorrent;
768    tr_torrent * tor = t->tor;
769    uint32_t i;
770    int peerCount;
771    tr_peer ** peers;
772    uint64_t blockCount;
773    uint64_t * blocks;
774
775    if( !t->isRunning )
776        return TRUE;
777    if( tr_torrentIsSeed( t->tor ) )
778        return TRUE;
779
780    torrentLock( t );
781    tordbg( t, "Refilling Request Buffers..." );
782
783    blocks = getPreferredBlocks( t, &blockCount );
784    peers = getConnectedPeers( t, &peerCount );
785
786    for( i=0; peerCount && i<blockCount; ++i )
787    {
788        const uint64_t block = blocks[i];
789        const uint32_t index = tr_torBlockPiece( tor, block );
790        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
791        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
792        int j;
793        assert( _tr_block( tor, index, begin ) == (int)block );
794        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
795        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
796
797        /* find a peer who can ask for this block */
798        for( j=0; j<peerCount; )
799        {
800            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
801            switch( val )
802            {
803                case TR_ADDREQ_FULL: 
804                case TR_ADDREQ_CLIENT_CHOKED:
805                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
806                    break;
807
808                case TR_ADDREQ_MISSING: 
809                case TR_ADDREQ_DUPLICATE: 
810                    ++j;
811                    break;
812
813                case TR_ADDREQ_OK:
814                    tr_bitfieldAdd( t->requested, block );
815                    j = peerCount;
816                    break;
817
818                default:
819                    assert( 0 && "unhandled value" );
820                    break;
821            }
822        }
823    }
824
825    /* cleanup */
826    tr_free( peers );
827    tr_free( blocks );
828
829    t->refillTimer = NULL;
830    torrentUnlock( t );
831    return FALSE;
832}
833
834static void
835broadcastClientHave( Torrent * t, uint32_t index )
836{
837    int i, size;
838    tr_peer ** peers;
839
840    assert( torrentIsLocked( t ) );
841
842    peers = getConnectedPeers( t, &size );
843    for( i=0; i<size; ++i )
844        tr_peerMsgsHave( peers[i]->msgs, index );
845    tr_free( peers );
846}
847
848static void
849broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
850{
851    int i, size;
852    tr_peer ** peers;
853
854    assert( torrentIsLocked( t ) );
855
856    peers = getConnectedPeers( t, &size );
857    for( i=0; i<size; ++i )
858        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
859    tr_free( peers );
860}
861
862static void
863addStrike( Torrent * t, tr_peer * peer )
864{
865    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
866
867    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
868    {
869        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
870        atom->myflags |= MYFLAG_BANNED;
871        peer->doPurge = 1;
872        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
873    }
874}
875
876static void
877msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
878{
879    tr_peer * peer = vpeer;
880    Torrent * t = (Torrent *) vt;
881    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
882
883    torrentLock( t );
884
885    switch( e->eventType )
886    {
887        case TR_PEERMSG_NEED_REQ:
888            if( t->refillTimer == NULL )
889                t->refillTimer = tr_timerNew( t->manager->handle,
890                                              refillPulse, t,
891                                              REFILL_PERIOD_MSEC );
892            break;
893
894        case TR_PEERMSG_CANCEL:
895            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
896            break;
897
898        case TR_PEERMSG_PIECE_DATA: {
899            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
900            atom->piece_data_time = time( NULL );
901            break;
902        }
903
904        case TR_PEERMSG_CLIENT_HAVE:
905            broadcastClientHave( t, e->pieceIndex );
906            tr_torrentRecheckCompleteness( t->tor );
907            break;
908
909        case TR_PEERMSG_PEER_PROGRESS: {
910            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
911            const int peerIsSeed = e->progress >= 1.0;
912            if( peerIsSeed ) {
913                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
914                atom->flags |= ADDED_F_SEED_FLAG;
915            } else {
916                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
917                atom->flags &= ~ADDED_F_SEED_FLAG;
918            } break;
919        }
920
921        case TR_PEERMSG_CLIENT_BLOCK:
922            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
923            break;
924
925        case TR_PEERMSG_GOT_ASSERT_ERROR:
926            addStrike( t, peer );
927            peer->doPurge = 1;
928            break;
929
930        case TR_PEERMSG_ERROR:
931            peer->doPurge = 1;
932            break;
933
934        default:
935            assert(0);
936    }
937
938    torrentUnlock( t );
939}
940
941static void
942ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
943{
944    if( getExistingAtom( t, addr ) == NULL )
945    {
946        struct peer_atom * a;
947        a = tr_new0( struct peer_atom, 1 );
948        a->addr = *addr;
949        a->port = port;
950        a->flags = flags;
951        a->from = from;
952        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
953        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
954    }
955}
956
957static int
958getMaxPeerCount( const tr_torrent * tor UNUSED )
959{
960#if 0
961    return t->tor->maxConnectedPeers;
962#else
963    return 50;
964#endif
965}
966
967/* FIXME: this is kind of a mess. */
968static void
969myHandshakeDoneCB( tr_handshake    * handshake,
970                   tr_peerIo       * io,
971                   int               isConnected,
972                   const uint8_t   * peer_id,
973                   void            * vmanager )
974{
975    int ok = isConnected;
976    uint16_t port;
977    const struct in_addr * addr;
978    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
979    Torrent * t;
980    tr_handshake * ours;
981
982    assert( io != NULL );
983    assert( isConnected==0 || isConnected==1 );
984
985    t = tr_peerIoHasTorrentHash( io )
986        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
987        : NULL;
988
989    if( tr_peerIoIsIncoming ( io ) )
990        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
991                                        handshake, handshakeCompare );
992    else if( t != NULL )
993        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
994                                        handshake, handshakeCompare );
995    else
996        ours = handshake;
997
998    assert( ours != NULL );
999    assert( ours == handshake );
1000
1001    if( t != NULL )
1002        torrentLock( t );
1003
1004    addr = tr_peerIoGetAddress( io, &port );
1005
1006    if( !ok || !t || !t->isRunning )
1007    {
1008        if( t ) {
1009            struct peer_atom * atom = getExistingAtom( t, addr );
1010            if( atom )
1011                ++atom->numFails;
1012        }
1013
1014        tr_peerIoFree( io );
1015    }
1016    else /* looking good */
1017    {
1018        struct peer_atom * atom;
1019        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1020        atom = getExistingAtom( t, addr );
1021
1022        if( atom->myflags & MYFLAG_BANNED )
1023        {
1024            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1025            tr_peerIoFree( io );
1026        }
1027        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1028        {
1029            tr_peerIoFree( io );
1030        }
1031        else
1032        {
1033            tr_peer * peer = getExistingPeer( t, addr );
1034
1035            if( peer != NULL ) /* we already have this peer */
1036            {
1037                tr_peerIoFree( io );
1038            }
1039            else
1040            {
1041                peer = getPeer( t, addr );
1042                tr_free( peer->client );
1043                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
1044                peer->port = port;
1045                peer->io = io;
1046                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1047                atom->time = time( NULL );
1048            }
1049        }
1050    }
1051
1052    if( t != NULL )
1053        torrentUnlock( t );
1054}
1055
1056void
1057tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1058                       struct in_addr  * addr,
1059                       uint16_t          port,
1060                       int               socket )
1061{
1062    managerLock( manager );
1063
1064    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1065    {
1066        tr_netClose( socket );
1067    }
1068    else /* we don't have a connetion to them yet... */
1069    {
1070        tr_peerIo * io;
1071        tr_handshake * handshake;
1072
1073        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1074
1075        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1076
1077        handshake = tr_handshakeNew( io,
1078                                     manager->handle->encryptionMode,
1079                                     myHandshakeDoneCB,
1080                                     manager );
1081
1082        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1083    }
1084
1085    managerUnlock( manager );
1086}
1087
1088void
1089tr_peerMgrAddPex( tr_peerMgr     * manager,
1090                  const uint8_t  * torrentHash,
1091                  uint8_t          from,
1092                  const tr_pex   * pex )
1093{
1094    Torrent * t;
1095    managerLock( manager );
1096
1097    t = getExistingTorrent( manager, torrentHash );
1098    ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1099
1100    managerUnlock( manager );
1101}
1102
1103void
1104tr_peerMgrAddPeers( tr_peerMgr    * manager,
1105                    const uint8_t * torrentHash,
1106                    uint8_t         from,
1107                    const uint8_t * peerCompact,
1108                    int             peerCount )
1109{
1110    int i;
1111    const uint8_t * walk = peerCompact;
1112    Torrent * t;
1113
1114    managerLock( manager );
1115
1116    t = getExistingTorrent( manager, torrentHash );
1117    for( i=0; t!=NULL && i<peerCount; ++i )
1118    {
1119        struct in_addr addr;
1120        uint16_t port;
1121        memcpy( &addr, walk, 4 ); walk += 4;
1122        memcpy( &port, walk, 2 ); walk += 2;
1123        ensureAtomExists( t, &addr, port, 0, from );
1124    }
1125
1126    managerUnlock( manager );
1127}
1128
1129/**
1130***
1131**/
1132
1133void
1134tr_peerMgrSetBlame( tr_peerMgr     * manager,
1135                    const uint8_t  * torrentHash,
1136                    int              pieceIndex,
1137                    int              success )
1138{
1139    if( !success )
1140    {
1141        int peerCount, i;
1142        Torrent * t = getExistingTorrent( manager, torrentHash );
1143        tr_peer ** peers;
1144
1145        assert( torrentIsLocked( t ) );
1146
1147        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1148        for( i=0; i<peerCount; ++i )
1149        {
1150            tr_peer * peer = peers[i];
1151            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1152            {
1153                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1154                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1155                           pieceIndex, (int)peer->strikes+1 );
1156                addStrike( t, peer );
1157            }
1158        }
1159    }
1160}
1161
1162int
1163tr_pexCompare( const void * va, const void * vb )
1164{
1165    const tr_pex * a = (const tr_pex *) va;
1166    const tr_pex * b = (const tr_pex *) vb;
1167    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1168    if( i ) return i;
1169    if( a->port < b->port ) return -1;
1170    if( a->port > b->port ) return 1;
1171    return 0;
1172}
1173
1174int tr_pexCompare( const void * a, const void * b );
1175
1176static int
1177peerPrefersCrypto( const tr_peer * peer )
1178{
1179    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1180        return TRUE;
1181
1182    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1183        return FALSE;
1184
1185    return tr_peerIoIsEncrypted( peer->io );
1186};
1187
1188int
1189tr_peerMgrGetPeers( tr_peerMgr      * manager,
1190                    const uint8_t   * torrentHash,
1191                    tr_pex         ** setme_pex )
1192{
1193    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1194    int i, peerCount;
1195    const tr_peer ** peers;
1196    tr_pex * pex;
1197    tr_pex * walk;
1198
1199    torrentLock( (Torrent*)t );
1200
1201    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1202    pex = walk = tr_new( tr_pex, peerCount );
1203
1204    for( i=0; i<peerCount; ++i, ++walk )
1205    {
1206        const tr_peer * peer = peers[i];
1207
1208        walk->in_addr = peer->in_addr;
1209
1210        walk->port = peer->port;
1211
1212        walk->flags = 0;
1213        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1214        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1215    }
1216
1217    assert( ( walk - pex ) == peerCount );
1218    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1219    *setme_pex = pex;
1220
1221    torrentUnlock( (Torrent*)t );
1222
1223    return peerCount;
1224}
1225
1226static int reconnectPulse( void * vtorrent );
1227static int rechokePulse( void * vtorrent );
1228static int swiftPulse( void * vtorrent );
1229
1230void
1231tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1232                        const uint8_t  * torrentHash )
1233{
1234    Torrent * t;
1235
1236    managerLock( manager );
1237
1238    t = getExistingTorrent( manager, torrentHash );
1239
1240    assert( t != NULL );
1241    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1242    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1243    assert( ( t->isRunning != 0 ) == ( t->swiftTimer != NULL ) );
1244
1245    if( !t->isRunning )
1246    {
1247        t->isRunning = 1;
1248
1249        t->reconnectTimer = tr_timerNew( t->manager->handle,
1250                                         reconnectPulse, t,
1251                                         RECONNECT_PERIOD_MSEC );
1252
1253        t->rechokeTimer = tr_timerNew( t->manager->handle,
1254                                       rechokePulse, t,
1255                                       RECHOKE_PERIOD_MSEC );
1256
1257        t->swiftTimer = tr_timerNew( t->manager->handle,
1258                                     swiftPulse, t,
1259                                     SWIFT_PERIOD_MSEC );
1260
1261        reconnectPulse( t );
1262
1263        rechokePulse( t );
1264
1265        swiftPulse( t );
1266    }
1267
1268    managerUnlock( manager );
1269}
1270
1271static void
1272stopTorrent( Torrent * t )
1273{
1274    assert( torrentIsLocked( t ) );
1275
1276    t->isRunning = 0;
1277    tr_timerFree( &t->rechokeTimer );
1278    tr_timerFree( &t->reconnectTimer );
1279    tr_timerFree( &t->swiftTimer );
1280
1281    /* disconnect the peers. */
1282    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1283    tr_ptrArrayClear( t->peers );
1284
1285    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1286     * which removes the handshake from t->outgoingHandshakes... */
1287    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1288        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1289}
1290void
1291tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1292                       const uint8_t  * torrentHash)
1293{
1294    managerLock( manager );
1295
1296    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1297
1298    managerUnlock( manager );
1299}
1300
1301void
1302tr_peerMgrAddTorrent( tr_peerMgr * manager,
1303                      tr_torrent * tor )
1304{
1305    Torrent * t;
1306
1307    managerLock( manager );
1308
1309    assert( tor != NULL );
1310    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1311
1312    t = torrentConstructor( manager, tor );
1313    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1314
1315    managerUnlock( manager );
1316}
1317
1318void
1319tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1320                         const uint8_t  * torrentHash )
1321{
1322    Torrent * t;
1323
1324    managerLock( manager );
1325
1326    t = getExistingTorrent( manager, torrentHash );
1327    assert( t != NULL );
1328    stopTorrent( t );
1329    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1330    torrentDestructor( t );
1331
1332    managerUnlock( manager );
1333}
1334
1335void
1336tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1337                               const uint8_t    * torrentHash,
1338                               int8_t           * tab,
1339                               int                tabCount )
1340{
1341    int i;
1342    const Torrent * t;
1343    const tr_torrent * tor;
1344    float interval;
1345
1346    managerLock( (tr_peerMgr*)manager );
1347
1348    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1349    tor = t->tor;
1350    interval = tor->info.pieceCount / (float)tabCount;
1351
1352    memset( tab, 0, tabCount );
1353
1354    for( i=0; i<tabCount; ++i )
1355    {
1356        const int piece = i * interval;
1357
1358        if( tor == NULL )
1359            tab[i] = 0;
1360        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1361            tab[i] = -1;
1362        else {
1363            int j, peerCount;
1364            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1365            for( j=0; j<peerCount; ++j )
1366                if( tr_bitfieldHas( peers[j]->have, i ) )
1367                    ++tab[i];
1368        }
1369    }
1370
1371    managerUnlock( (tr_peerMgr*)manager );
1372}
1373
1374/* Returns the pieces that we and/or a connected peer has */
1375tr_bitfield*
1376tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1377                        const uint8_t    * torrentHash )
1378{
1379    int i, size;
1380    const Torrent * t;
1381    const tr_peer ** peers;
1382    tr_bitfield * pieces;
1383
1384    managerLock( (tr_peerMgr*)manager );
1385
1386    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1387    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1388    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1389    for( i=0; i<size; ++i )
1390        if( peers[i]->io != NULL )
1391            tr_bitfieldOr( pieces, peers[i]->have );
1392
1393    managerUnlock( (tr_peerMgr*)manager );
1394    return pieces;
1395}
1396
1397int
1398tr_peerMgrHasConnections( const tr_peerMgr * manager,
1399                          const uint8_t    * torrentHash )
1400{
1401    int ret;
1402    const Torrent * t;
1403    managerLock( (tr_peerMgr*)manager );
1404
1405    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1406    ret = t && tr_ptrArraySize( t->peers );
1407
1408    managerUnlock( (tr_peerMgr*)manager );
1409    return ret;
1410}
1411
1412static int
1413clientIsDownloadingFrom( const tr_peer * peer )
1414{
1415    return peer->clientIsInterested && !peer->clientIsChoked;
1416}
1417
1418static int
1419clientIsUploadingTo( const tr_peer * peer )
1420{
1421    return peer->peerIsInterested && !peer->peerIsChoked;
1422}
1423
1424void
1425tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1426                        const uint8_t    * torrentHash,
1427                        int              * setmePeersKnown,
1428                        int              * setmePeersConnected,
1429                        int              * setmePeersSendingToUs,
1430                        int              * setmePeersGettingFromUs,
1431                        int              * setmePeersFrom )
1432{
1433    int i, size;
1434    const Torrent * t;
1435    const tr_peer ** peers;
1436
1437    managerLock( (tr_peerMgr*)manager );
1438
1439    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1440    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1441
1442    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1443    *setmePeersConnected      = 0;
1444    *setmePeersSendingToUs    = 0;
1445    *setmePeersGettingFromUs  = 0;
1446
1447    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1448        setmePeersFrom[i] = 0;
1449
1450    for( i=0; i<size; ++i )
1451    {
1452        const tr_peer * peer = peers[i];
1453        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1454
1455        if( peer->io == NULL ) /* not connected */
1456            continue;
1457
1458        ++*setmePeersConnected;
1459
1460        ++setmePeersFrom[atom->from];
1461
1462        if( clientIsUploadingTo( peer ) )
1463            ++*setmePeersGettingFromUs;
1464
1465        if( clientIsDownloadingFrom( peer ) )
1466            ++*setmePeersSendingToUs;
1467    }
1468
1469    managerUnlock( (tr_peerMgr*)manager );
1470}
1471
1472struct tr_peer_stat *
1473tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1474                     const uint8_t     * torrentHash,
1475                     int               * setmeCount UNUSED )
1476{
1477    int i, size;
1478    const Torrent * t;
1479    tr_peer ** peers;
1480    tr_peer_stat * ret;
1481
1482    assert( manager != NULL );
1483    managerLock( (tr_peerMgr*)manager );
1484
1485    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1486    peers = getConnectedPeers( (Torrent*)t, &size );
1487    ret = tr_new0( tr_peer_stat, size );
1488
1489    for( i=0; i<size; ++i )
1490    {
1491        const tr_peer * peer = peers[i];
1492        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1493        tr_peer_stat * stat = ret + i;
1494
1495        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1496        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1497        stat->port             = peer->port;
1498        stat->from             = atom->from;
1499        stat->progress         = peer->progress;
1500        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1501        stat->uploadToRate     = peer->rateToPeer;
1502        stat->downloadFromRate = peer->rateToClient;
1503        stat->isDownloading    = clientIsDownloadingFrom( peer );
1504        stat->isUploading      = clientIsUploadingTo( peer );
1505        stat->status           = peer->status;
1506    }
1507
1508    *setmeCount = size;
1509    tr_free( peers );
1510
1511    managerUnlock( (tr_peerMgr*)manager );
1512    return ret;
1513}
1514
1515/**
1516***
1517**/
1518
1519struct ChokeData
1520{
1521    uint8_t doUnchoke;
1522    uint8_t isInterested;
1523    uint32_t rate;
1524    tr_peer * peer;
1525};
1526
1527static int
1528compareChoke( const void * va, const void * vb )
1529{
1530    const struct ChokeData * a = va;
1531    const struct ChokeData * b = vb;
1532    return -tr_compareUint32( a->rate, b->rate );
1533}
1534
1535static int
1536isNew( const tr_peer * peer )
1537{
1538    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1539}
1540
1541static int
1542isSame( const tr_peer * peer )
1543{
1544    return peer && peer->client && strstr( peer->client, "Transmission" );
1545}
1546
1547/**
1548***
1549**/
1550
1551static double
1552getWeightedRate( const tr_peer * peer, int clientIsSeed )
1553{
1554    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1555                                        : peer->rateToClient ) );
1556}
1557
1558static void
1559rechoke( Torrent * t )
1560{
1561    int i, n, peerCount, size, unchokedInterested;
1562    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1563    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1564    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1565
1566    assert( torrentIsLocked( t ) );
1567   
1568    /* sort the peers by preference and rate */
1569    for( i=0, size=0; i<peerCount; ++i )
1570    {
1571        tr_peer * peer = peers[i];
1572        if( peer->progress >= 1.0 ) /* choke all seeds */
1573            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1574        else {
1575            struct ChokeData * node = &choke[size++];
1576            node->peer = peer;
1577            node->isInterested = peer->peerIsInterested;
1578            node->rate = getWeightedRate( peer, clientIsSeed );
1579        }
1580    }
1581
1582    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1583
1584    /**
1585     * Reciprocation and number of uploads capping is managed by unchoking
1586     * the N peers which have the best upload rate and are interested.
1587     * This maximizes the client's download rate. These N peers are
1588     * referred to as downloaders, because they are interested in downloading
1589     * from the client.
1590     *
1591     * Peers which have a better upload rate (as compared to the downloaders)
1592     * but aren't interested get unchoked. If they become interested, the
1593     * downloader with the worst upload rate gets choked. If a client has
1594     * a complete file, it uses its upload rate rather than its download
1595     * rate to decide which peers to unchoke.
1596     */
1597    unchokedInterested = 0;
1598    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1599        choke[i].doUnchoke = 1;
1600        if( choke[i].isInterested )
1601            ++unchokedInterested;
1602    }
1603    n = i;
1604    while( i<size )
1605        choke[i++].doUnchoke = 0;
1606
1607    /* optimistic unchoke */
1608    if( i < size )
1609    {
1610        struct ChokeData * c;
1611        tr_ptrArray * randPool = tr_ptrArrayNew( );
1612        for( ; i<size; ++i )
1613        {
1614            const tr_peer * peer = choke[i].peer;
1615            int x=1, y;
1616            if( isNew( peer ) ) x *= 3;
1617            if( isSame( peer ) ) x *= 3;
1618            for( y=0; y<x; ++y )
1619                tr_ptrArrayAppend( randPool, choke );
1620        }
1621        i = tr_rand( tr_ptrArraySize( randPool ) );
1622        c = ( struct ChokeData* )tr_ptrArrayNth( randPool, i);
1623        c->doUnchoke = 1;
1624        t->optimistic = c->peer;
1625        tr_ptrArrayFree( randPool, NULL );
1626    }
1627
1628    for( i=0; i<size; ++i )
1629        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1630
1631    /* cleanup */
1632    tr_free( choke );
1633    tr_free( peers );
1634}
1635
1636static int
1637rechokePulse( void * vtorrent )
1638{
1639    Torrent * t = vtorrent;
1640    torrentLock( t );
1641    rechoke( t );
1642    torrentUnlock( t );
1643    return TRUE;
1644}
1645
1646/***
1647****
1648***/
1649
1650static int
1651swiftPulse( void * vtorrent )
1652{
1653    Torrent * t = vtorrent;
1654    torrentLock( t );
1655
1656    if( !tr_torrentIsSeed( t->tor ) )
1657    {
1658        int i;
1659        int peerCount = 0;
1660        int deadbeatCount = 0;
1661        tr_peer ** peers = getConnectedPeers( t, &peerCount );
1662        tr_peer ** deadbeats = tr_new( tr_peer*, peerCount );
1663
1664        const double ul_KiBsec = tr_rcRate( t->tor->upload );
1665        const double ul_KiB = ul_KiBsec * (SWIFT_PERIOD_MSEC/1000.0);
1666        const double ul_bytes = ul_KiB * 1024;
1667        const double freeCreditTotal = ul_bytes * SWIFT_LARGESSE;
1668        int freeCreditPerPeer;
1669
1670        for( i=0; i<peerCount; ++i ) {
1671            tr_peer * peer = peers[i];
1672            if( peer->credit <= 0 )
1673                deadbeats[deadbeatCount++] =  peer;
1674        }
1675
1676        freeCreditPerPeer = (int)( freeCreditTotal / deadbeatCount );
1677        for( i=0; i<deadbeatCount; ++i )
1678            deadbeats[i]->credit = freeCreditPerPeer;
1679
1680        tordbg( t, "%d deadbeats, "
1681            "who are each being granted %d bytes' credit "
1682            "for a total of %.1f KiB, "
1683            "%d%% of the torrent's ul speed %.1f\n",
1684            deadbeatCount, freeCreditPerPeer,
1685            ul_KiBsec*SWIFT_LARGESSE, (int)(SWIFT_LARGESSE*100), ul_KiBsec );
1686
1687        tr_free( deadbeats );
1688        tr_free( peers );
1689    }
1690
1691    torrentUnlock( t );
1692    return TRUE;
1693}
1694
1695/***
1696****
1697****  Life and Death
1698****
1699***/
1700
1701static int
1702shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1703{
1704    const tr_torrent * tor = t->tor;
1705    const time_t now = time( NULL );
1706    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1707
1708    /* if it's marked for purging, close it */
1709    if( peer->doPurge ) {
1710        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1711        return TRUE;
1712    }
1713
1714    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1715    if( 1 ) {
1716        const int clientIsSeed = tr_torrentIsSeed( tor );
1717        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1718        if( peerIsSeed && clientIsSeed && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1719            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1720            return TRUE;
1721        }
1722    }
1723
1724    /* disconnect if it's been too long since piece data has been transferred.
1725     * this is on a sliding scale based on number of available peers... */
1726    if( 1 ) {
1727        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1728        /* if we have >= relaxIfFewerThan, strictness is 100%.
1729         * if we have zero connections, strictness is 0% */
1730        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1731            ? 1.0
1732            : peerCount / (double)relaxStrictnessIfFewerThanN;
1733        const int lo = MIN_UPLOAD_IDLE_SECS;
1734        const int hi = MAX_UPLOAD_IDLE_SECS;
1735        const int limit = lo + ((hi-lo) * strictness);
1736        const time_t then = peer->pieceDataActivityDate;
1737        const int idleTime = then ? (now-then) : 0;
1738        if( idleTime > limit ) {
1739            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1740                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1741            return TRUE;
1742        }
1743    }
1744
1745    return FALSE;
1746}
1747
1748static tr_peer **
1749getPeersToClose( Torrent * t, int * setmeSize )
1750{
1751    int i, peerCount, outsize;
1752    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1753    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1754
1755    assert( torrentIsLocked( t ) );
1756
1757    for( i=outsize=0; i<peerCount; ++i )
1758        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1759            ret[outsize++] = peers[i];
1760
1761    *setmeSize = outsize;
1762    return ret;
1763}
1764
1765static int
1766compareCandidates( const void * va, const void * vb )
1767{
1768    const struct peer_atom * a = * (const struct peer_atom**) va;
1769    const struct peer_atom * b = * (const struct peer_atom**) vb;
1770    int i;
1771
1772    if( a->piece_data_time > b->piece_data_time ) return -1;
1773    if( a->piece_data_time < b->piece_data_time ) return  1;
1774
1775    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1776        return i;
1777
1778    if( a->time != b->time )
1779        return a->time < b->time ? -1 : 1;
1780
1781    return 0;
1782}
1783
1784static struct peer_atom **
1785getPeerCandidates( Torrent * t, int * setmeSize )
1786{
1787    int i, atomCount, retCount;
1788    struct peer_atom ** atoms;
1789    struct peer_atom ** ret;
1790    const time_t now = time( NULL );
1791    const int seed = tr_torrentIsSeed( t->tor );
1792
1793    assert( torrentIsLocked( t ) );
1794
1795    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1796    ret = tr_new( struct peer_atom*, atomCount );
1797    for( i=retCount=0; i<atomCount; ++i )
1798    {
1799        struct peer_atom * atom = atoms[i];
1800
1801        /* peer fed us too much bad data ... we only keep it around
1802         * now to weed it out in case someone sends it to us via pex */
1803        if( atom->myflags & MYFLAG_BANNED )
1804            continue;
1805
1806        /* peer was unconnectable before, so we're not going to keep trying.
1807         * this is needs a separate flag from `banned', since if they try
1808         * to connect to us later, we'll let them in */
1809        if( atom->myflags & MYFLAG_UNREACHABLE )
1810            continue;
1811
1812        /* we don't need two connections to the same peer... */
1813        if( peerIsInUse( t, &atom->addr ) )
1814            continue;
1815
1816        /* no need to connect if we're both seeds... */
1817        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1818            continue;
1819
1820        /* we're wasting our time trying to connect to this bozo. */
1821        if( atom->numFails > 3 )
1822            continue;
1823
1824        /* If we were connected to this peer recently and transferring
1825         * piece data, try to reconnect -- network troubles may have
1826         * disconnected us.  but if we weren't sharing piece data,
1827         * hold off on this peer to give another one a try instead */
1828        if( ( now - atom->piece_data_time ) > 30 )
1829        {
1830            int minWait = (60 * 10); /* ten minutes */
1831            int maxWait = (60 * 30); /* thirty minutes */
1832            int wait = atom->numFails * minWait;
1833            if( wait < minWait ) wait = minWait;
1834            if( wait > maxWait ) wait = maxWait;
1835            if( ( now - atom->time ) < wait ) {
1836                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1837                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1838                continue;
1839            }
1840        }
1841
1842        ret[retCount++] = atom;
1843    }
1844
1845    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1846    *setmeSize = retCount;
1847    return ret;
1848}
1849
1850static int
1851reconnectPulse( void * vtorrent )
1852{
1853    Torrent * t = vtorrent;
1854    static time_t prevTime = 0;
1855    static int newConnectionsThisSecond = 0;
1856    time_t now;
1857
1858    torrentLock( t );
1859
1860    now = time( NULL );
1861    if( prevTime != now )
1862    {
1863        prevTime = now;
1864        newConnectionsThisSecond = 0;
1865    }
1866
1867    if( !t->isRunning )
1868    {
1869        removeAllPeers( t );
1870    }
1871    else
1872    {
1873        int i, nCandidates, nBad;
1874        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1875        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1876
1877        if( nBad || nCandidates )
1878            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1879                       "%d connection candidates, %d atoms, max per pulse is %d",
1880                       t->tor->info.name, nBad, nCandidates,
1881                       tr_ptrArraySize(t->pool),
1882                       (int)MAX_RECONNECTIONS_PER_PULSE );
1883
1884        /* disconnect some peers.
1885           if we got transferred piece data, then they might be good peers,
1886           so reset their `numFails' weight to zero.  otherwise we connected
1887           to them fruitlessly, so mark it as another fail */
1888        for( i=0; i<nBad; ++i ) {
1889            tr_peer * peer = connections[i];
1890            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1891            if( peer->pieceDataActivityDate )
1892                atom->numFails = 0;
1893            else
1894                ++atom->numFails;
1895            removePeer( t, peer );
1896        }
1897
1898        /* add some new ones */
1899        for( i=0;    i < nCandidates
1900                  && i < MAX_RECONNECTIONS_PER_PULSE
1901                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1902        {
1903            tr_peerMgr * mgr = t->manager;
1904            struct peer_atom * atom = candidates[i];
1905            tr_peerIo * io;
1906
1907            tordbg( t, "Starting an OUTGOING connection with %s",
1908                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1909
1910            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1911            if( io == NULL )
1912            {
1913                atom->myflags |= MYFLAG_UNREACHABLE;
1914            }
1915            else
1916            {
1917                tr_handshake * handshake = tr_handshakeNew( io,
1918                                                            mgr->handle->encryptionMode,
1919                                                            myHandshakeDoneCB,
1920                                                            mgr );
1921
1922                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1923
1924                ++newConnectionsThisSecond;
1925
1926                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1927            }
1928
1929            atom->time = time( NULL );
1930        }
1931
1932        /* cleanup */
1933        tr_free( connections );
1934        tr_free( candidates );
1935    }
1936
1937    torrentUnlock( t );
1938    return TRUE;
1939}
Note: See TracBrowser for help on using the repository browser.