source: branches/0.9x/libtransmission/peer-mgr.c @ 3931

Last change on this file since 3931 was 3931, checked in by charles, 15 years ago

tweaks

  • Property svn:keywords set to Date Rev Author Id
File size: 51.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3931 2007-11-22 20:42:18Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20#include <arpa/inet.h> /* inet_ntoa */
21
22#include <event.h>
23
24#include "transmission.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "platform.h"
35#include "ptrarray.h"
36#include "ratecontrol.h"
37#include "shared.h"
38#include "trcompat.h" /* strlcpy */
39#include "trevent.h"
40#include "utils.h"
41
42/**
43*** The "SWIFT" system is described by Karthik Tamilmani,
44*** Vinay Pai, and Alexander Mohr of Stony Brook University
45*** in their paper "SWIFT: A System With Incentives For Trading"
46*** http://citeseer.ist.psu.edu/tamilmani04swift.html
47***
48*** More SWIFT constants are defined in peer-mgr-private.h
49**/
50
51/**
52 * Allow new peers to download this many bytes from
53 * us when getting started.  This can prevent gridlock
54 * with other peers using tit-for-tat algorithms
55 */
56static const int SWIFT_INITIAL_CREDIT = 64 * 1024; /* 64 KiB */
57
58/**
59 * We expend a fraction of our torrent's total upload speed
60 * on largesse by uniformly distributing free credit to
61 * all of our peers.  This too helps prevent gridlock.
62 */
63static const double SWIFT_LARGESSE = 0.10; /* 10% of our UL */
64
65/**
66 * How frequently to extend largesse-based credit
67 */
68static const int SWIFT_PERIOD_MSEC = 5000;
69
70
71enum
72{
73    /* how frequently to change which peers are choked */
74    RECHOKE_PERIOD_MSEC = (1000),
75
76    /* how frequently to decide which peers live and die */
77    RECONNECT_PERIOD_MSEC = (6 * 1000),
78
79    /* how frequently to refill peers' request lists */
80    REFILL_PERIOD_MSEC = 666,
81
82    /* don't change a peer's choke status more often than this */
83    MIN_CHOKE_PERIOD_SEC = 10,
84
85    /* following the BT spec, we consider ourselves `snubbed' if
86     * we're we don't get piece data from a peer in this long */
87    SNUBBED_SEC = 60,
88
89    /* arbitrary */
90    MAX_CONNECTED_PEERS_PER_TORRENT = 50,
91
92    /* when many peers are available, keep idle ones this long */
93    MIN_UPLOAD_IDLE_SECS = (60 * 3),
94
95    /* when few peers are available, keep idle ones this long */
96    MAX_UPLOAD_IDLE_SECS = (60 * 10),
97
98    /* how many peers to unchoke per-torrent. */
99    /* FIXME: make this user-configurable? */
100    NUM_UNCHOKED_PEERS_PER_TORRENT = 20, /* arbitrary */
101
102    /* set this too high and there will be a lot of churn.
103     * set it too low and you'll get peers too slowly */
104    MAX_RECONNECTIONS_PER_PULSE = 5,
105
106    /* corresponds to ut_pex's added.f flags */
107    ADDED_F_ENCRYPTION_FLAG = 1,
108    /* corresponds to ut_pex's added.f flags */
109    ADDED_F_SEED_FLAG = 2,
110
111    /* number of bad pieces a peer is allowed to send before we ban them */
112    MAX_BAD_PIECES_PER_PEER = 3,
113    /* use for bitwise operations w/peer_atom.myflags */
114    MYFLAG_BANNED = 1
115};
116
117
118/**
119***
120**/
121
122/* We keep one of these for every peer we know about, whether
123 * it's connected or not, so the struct must be small.
124 * When our current connections underperform, we dip back
125 * into this list for new ones. */
126struct peer_atom
127{   
128    uint8_t from;
129    uint8_t flags; /* these match the added_f flags */
130    uint8_t myflags; /* flags that aren't defined in added_f */
131    uint16_t port;
132    uint16_t numFails;
133    struct in_addr addr;
134    time_t time;
135};
136
137typedef struct
138{
139    uint8_t hash[SHA_DIGEST_LENGTH];
140    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
141    tr_ptrArray * pool; /* struct peer_atom */
142    tr_ptrArray * peers; /* tr_peer */
143    tr_timer * reconnectTimer;
144    tr_timer * rechokeTimer;
145    tr_timer * refillTimer;
146    tr_timer * swiftTimer;
147    tr_torrent * tor;
148    tr_bitfield * requested;
149
150    unsigned int isRunning : 1;
151
152    struct tr_peerMgr * manager;
153}
154Torrent;
155
156struct tr_peerMgr
157{
158    tr_handle * handle;
159    tr_ptrArray * torrents; /* Torrent */
160    tr_ptrArray * incomingHandshakes; /* tr_handshake */
161};
162
163/**
164***
165**/
166
167static void
168myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
169{
170    FILE * fp = tr_getLog( );
171    if( fp != NULL )
172    {
173        va_list args;
174        char timestr[64];
175        struct evbuffer * buf = evbuffer_new( );
176        char * myfile = tr_strdup( file );
177
178        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
179        if( t != NULL )
180            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
181        va_start( args, fmt );
182        evbuffer_add_vprintf( buf, fmt, args );
183        va_end( args );
184        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
185        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
186
187        tr_free( myfile );
188        evbuffer_free( buf );
189    }
190}
191
192#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
193
194/**
195***
196**/
197
198static void
199managerLock( struct tr_peerMgr * manager )
200{
201    tr_globalLock( manager->handle );
202}
203static void
204managerUnlock( struct tr_peerMgr * manager )
205{
206    tr_globalUnlock( manager->handle );
207}
208static void
209torrentLock( Torrent * torrent )
210{
211    managerLock( torrent->manager );
212}
213static void
214torrentUnlock( Torrent * torrent )
215{
216    managerUnlock( torrent->manager );
217}
218static int
219torrentIsLocked( const Torrent * t )
220{
221    return tr_globalIsLocked( t->manager->handle );
222}
223
224/**
225***
226**/
227
228static int
229compareAddresses( const struct in_addr * a, const struct in_addr * b )
230{
231    return tr_compareUint32( a->s_addr, b->s_addr );
232}
233
234static int
235handshakeCompareToAddr( const void * va, const void * vb )
236{
237    const tr_handshake * a = va;
238    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
239}
240
241static int
242handshakeCompare( const void * a, const void * b )
243{
244    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
245}
246
247static tr_handshake*
248getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
249{
250    return tr_ptrArrayFindSorted( handshakes,
251                                  in_addr,
252                                  handshakeCompareToAddr );
253}
254
255static int
256comparePeerAtomToAddress( const void * va, const void * vb )
257{
258    const struct peer_atom * a = va;
259    return compareAddresses( &a->addr, vb );
260}
261
262static int
263comparePeerAtoms( const void * va, const void * vb )
264{
265    const struct peer_atom * b = vb;
266    return comparePeerAtomToAddress( va, &b->addr );
267}
268
269/**
270***
271**/
272
273static int
274torrentCompare( const void * va, const void * vb )
275{
276    const Torrent * a = va;
277    const Torrent * b = vb;
278    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
279}
280
281static int
282torrentCompareToHash( const void * va, const void * vb )
283{
284    const Torrent * a = va;
285    const uint8_t * b_hash = vb;
286    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
287}
288
289static Torrent*
290getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
291{
292    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
293                                             hash,
294                                             torrentCompareToHash );
295}
296
297static int
298peerCompare( const void * va, const void * vb )
299{
300    const tr_peer * a = va;
301    const tr_peer * b = vb;
302    return compareAddresses( &a->in_addr, &b->in_addr );
303}
304
305static int
306peerCompareToAddr( const void * va, const void * vb )
307{
308    const tr_peer * a = va;
309    return compareAddresses( &a->in_addr, vb );
310}
311
312static tr_peer*
313getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
314{
315    assert( torrentIsLocked( torrent ) );
316    assert( in_addr != NULL );
317
318    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
319                                             in_addr,
320                                             peerCompareToAddr );
321}
322
323static struct peer_atom*
324getExistingAtom( const Torrent * t, const struct in_addr * addr )
325{
326    assert( torrentIsLocked( t ) );
327    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
328}
329
330static int
331peerIsInUse( const Torrent * ct, const struct in_addr * addr )
332{
333    Torrent * t = (Torrent*) ct;
334
335    assert( torrentIsLocked ( t ) );
336
337    return getExistingPeer( t, addr )
338        || getExistingHandshake( t->outgoingHandshakes, addr )
339        || getExistingHandshake( t->manager->incomingHandshakes, addr );
340}
341
342static tr_peer*
343peerConstructor( const struct in_addr * in_addr )
344{
345    tr_peer * p;
346    p = tr_new0( tr_peer, 1 );
347    p->credit = SWIFT_INITIAL_CREDIT;
348    p->rcToClient = tr_rcInit( );
349    p->rcToPeer = tr_rcInit( );
350    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
351    return p;
352}
353
354static tr_peer*
355getPeer( Torrent * torrent, const struct in_addr * in_addr )
356{
357    tr_peer * peer;
358
359    assert( torrentIsLocked( torrent ) );
360
361    peer = getExistingPeer( torrent, in_addr );
362
363    if( peer == NULL ) {
364        peer = peerConstructor( in_addr );
365        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
366    }
367
368    return peer;
369}
370
371static void
372peerDestructor( tr_peer * peer )
373{
374    assert( peer != NULL );
375    assert( peer->msgs != NULL );
376
377    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
378    tr_peerMsgsFree( peer->msgs );
379
380    tr_peerIoFree( peer->io );
381
382    tr_bitfieldFree( peer->have );
383    tr_bitfieldFree( peer->blame );
384    tr_rcClose( peer->rcToClient );
385    tr_rcClose( peer->rcToPeer );
386    tr_free( peer->client );
387    tr_free( peer );
388}
389
390static void
391removePeer( Torrent * t, tr_peer * peer )
392{
393    tr_peer * removed;
394    struct peer_atom * atom;
395
396    assert( torrentIsLocked( t ) );
397
398    atom = getExistingAtom( t, &peer->in_addr );
399    assert( atom != NULL );
400    atom->time = time( NULL );
401
402    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
403    assert( removed == peer );
404    peerDestructor( removed );
405}
406
407static void
408removeAllPeers( Torrent * t )
409{
410    while( !tr_ptrArrayEmpty( t->peers ) )
411        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
412}
413
414static void
415torrentDestructor( Torrent * t )
416{
417    uint8_t hash[SHA_DIGEST_LENGTH];
418
419    assert( t != NULL );
420    assert( !t->isRunning );
421    assert( t->peers != NULL );
422    assert( torrentIsLocked( t ) );
423    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
424    assert( tr_ptrArrayEmpty( t->peers ) );
425
426    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
427
428    tr_timerFree( &t->reconnectTimer );
429    tr_timerFree( &t->rechokeTimer );
430    tr_timerFree( &t->refillTimer );
431    tr_timerFree( &t->swiftTimer );
432
433    tr_bitfieldFree( t->requested );
434    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
435    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
436    tr_ptrArrayFree( t->peers, NULL );
437
438    tr_free( t );
439}
440
441static Torrent*
442torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
443{
444    Torrent * t;
445
446    t = tr_new0( Torrent, 1 );
447    t->manager = manager;
448    t->tor = tor;
449    t->pool = tr_ptrArrayNew( );
450    t->peers = tr_ptrArrayNew( );
451    t->outgoingHandshakes = tr_ptrArrayNew( );
452    t->requested = tr_bitfieldNew( tor->blockCount );
453    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
454
455    return t;
456}
457
458/**
459***
460**/
461
462struct tr_bitfield *
463tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
464                              const uint32_t         pieceCount,
465                              const uint8_t          infohash[20],
466                              const struct in_addr * ip )
467{
468    /* This has been checked against the spec example implementation. Feeding it with :
469    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
470generate :
471    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
472    but since we're storing in a bitfield, it won't be in this order... */
473    /* TODO : We should translate link-local IPv4 adresses to external IP,
474     * so that being on same local network gives us the same allowed pieces */
475   
476    uint8_t *seed;
477    char buf[4];
478    uint32_t allowedPieceCount = 0;
479    tr_bitfield_t * ret;
480
481#if 0
482    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
483    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
484            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
485            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
486            inet_ntoa( *ip ) );
487#endif
488   
489    seed = malloc(4 + SHA_DIGEST_LENGTH);
490   
491    ret = tr_bitfieldNew( pieceCount );
492   
493    /* We need a seed based on most significant bytes of peer address
494        concatenated with torrent's infohash */
495    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
496   
497    memcpy( seed, &buf, 4 );
498    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
499   
500    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
501   
502    while ( allowedPieceCount < setCount )
503    {
504        int i;
505        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
506        {
507            /* We generate indices from 4-byte chunks of the seed */
508            uint32_t j = i * 4;
509            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
510            uint32_t index = y % pieceCount;
511           
512            if ( !tr_bitfieldHas( ret, index ) )
513            {
514                tr_bitfieldAdd( ret, index );
515                allowedPieceCount++;
516            }
517        }
518        /* We randomize the seed, in case we need to iterate more */
519        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
520    }
521    tr_free( seed );
522   
523    return ret;
524}
525
526tr_peerMgr*
527tr_peerMgrNew( tr_handle * handle )
528{
529    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
530    m->handle = handle;
531    m->torrents = tr_ptrArrayNew( );
532    m->incomingHandshakes = tr_ptrArrayNew( );
533    return m;
534}
535
536void
537tr_peerMgrFree( tr_peerMgr * manager )
538{
539    managerLock( manager );
540
541    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
542     * the item from manager->handshakes, so this is a little roundabout... */
543    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
544        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
545    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
546
547    /* free the torrents. */
548    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
549
550    managerUnlock( manager );
551    tr_free( manager );
552}
553
554static tr_peer**
555getConnectedPeers( Torrent * t, int * setmeCount )
556{
557    int i, peerCount, connectionCount;
558    tr_peer **peers;
559    tr_peer **ret;
560
561    assert( torrentIsLocked( t ) );
562
563    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
564    ret = tr_new( tr_peer*, peerCount );
565
566    for( i=connectionCount=0; i<peerCount; ++i )
567        if( peers[i]->msgs != NULL )
568            ret[connectionCount++] = peers[i];
569
570    *setmeCount = connectionCount;
571    return ret;
572}
573
574/***
575****  Refill
576***/
577
578struct tr_refill_piece
579{
580    tr_priority_t priority;
581    int percentDone;
582    uint16_t random;
583    uint32_t piece;
584    uint32_t peerCount;
585    uint32_t fastAllowed;
586    uint32_t suggested;
587};
588
589static int
590compareRefillPiece (const void * aIn, const void * bIn)
591{
592    const struct tr_refill_piece * a = aIn;
593    const struct tr_refill_piece * b = bIn;
594   
595    /* if one *might be* fastallowed to us, get it first...
596     * I'm putting it on top so we prioritize those pieces at
597     * startup, then we'll have them, and we'll be denied access
598     * to them */
599    if (a->fastAllowed != b->fastAllowed)
600        return a->fastAllowed < b->fastAllowed ? -1 : 1;
601   
602    /* if one piece has a higher priority, it goes first */
603    if (a->priority != b->priority)
604        return a->priority > b->priority ? -1 : 1;
605   
606    /* otherwise if one was suggested to us, get it */
607    if (a->suggested != b->suggested)
608        return a->suggested < b->suggested ? -1 : 1;
609
610    /* try to fill partial pieces */
611    if( a->percentDone != b->percentDone )
612        return a->percentDone > b->percentDone ? -1 : 1;
613   
614    /* otherwise if one has fewer peers, it goes first */
615    if (a->peerCount != b->peerCount)
616        return a->peerCount < b->peerCount ? -1 : 1;
617
618    /* otherwise go with our random seed */
619    return tr_compareUint16( a->random, b->random );
620}
621
622static int
623isPieceInteresting( const tr_torrent  * tor,
624                    int                 piece )
625{
626    if( tor->info.pieces[piece].dnd ) /* we don't want it */
627        return 0;
628
629    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
630        return 0;
631
632    return 1;
633}
634
635static uint32_t*
636getPreferredPieces( Torrent     * t,
637                    uint32_t    * pieceCount )
638{
639    const tr_torrent * tor = t->tor;
640    const tr_info * inf = &tor->info;
641    int i;
642    uint32_t poolSize = 0;
643    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
644    int peerCount;
645    tr_peer** peers;
646
647    assert( torrentIsLocked( t ) );
648
649    peers = getConnectedPeers( t, &peerCount );
650
651    for( i=0; i<inf->pieceCount; ++i )
652        if( isPieceInteresting( tor, i ) )
653            pool[poolSize++] = i;
654
655    /* sort the pool from most interesting to least... */
656    if( poolSize > 1 )
657    {
658        uint32_t j;
659        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
660
661        for( j=0; j<poolSize; ++j )
662        {
663            int k;
664            const int piece = pool[j];
665            struct tr_refill_piece * setme = p + j;
666
667            setme->piece = piece;
668            setme->priority = inf->pieces[piece].priority;
669            setme->peerCount = 0;
670            setme->fastAllowed = 0;
671            setme->random = tr_rand( UINT16_MAX );
672            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
673
674            for( k=0; k<peerCount; ++k ) {
675                const tr_peer * peer = peers[k];
676                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
677                    ++setme->peerCount;
678                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
679                    but we're guaranteed to get the same pieces from different peers,
680                    so we'll build a list and pray one actually have this one */
681                setme->fastAllowed = tr_peerMsgsIsPieceFastAllowed( peer->msgs, i );
682                /* Also, if someone SUGGESTed a piece to us, prioritize it over non-suggested others
683                 */
684                setme->suggested   = tr_peerMsgsIsPieceSuggested( peer->msgs, i );
685            }
686        }
687
688        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
689
690        for( j=0; j<poolSize; ++j )
691            pool[j] = p[j].piece;
692
693        tr_free( p );
694    }
695
696    tr_free( peers );
697
698    *pieceCount = poolSize;
699    return pool;
700}
701
702static uint64_t*
703getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
704{
705    uint32_t i;
706    uint32_t pieceCount;
707    uint32_t * pieces;
708    uint64_t *req, *unreq, *ret, *walk;
709    int reqCount, unreqCount;
710    const tr_torrent * tor = t->tor;
711
712    assert( torrentIsLocked( t ) );
713
714    pieces = getPreferredPieces( t, &pieceCount );
715
716    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
717    reqCount = 0;
718    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
719    unreqCount = 0;
720
721    for( i=0; i<pieceCount; ++i ) {
722        const uint32_t index = pieces[i];
723        const int begin = tr_torPieceFirstBlock( tor, index );
724        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
725        int block;
726        for( block=begin; block<end; ++block )
727            if( tr_cpBlockIsComplete( tor->completion, block ) )
728                continue;
729            else if( tr_bitfieldHas( t->requested, block ) )
730                req[reqCount++] = block;
731            else
732                unreq[unreqCount++] = block;
733    }
734
735    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
736    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
737    walk += unreqCount;
738    memcpy( walk, req, sizeof(uint64_t) * reqCount );
739    walk += reqCount;
740    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
741    *setmeCount = walk - ret;
742
743    tr_free( req );
744    tr_free( unreq );
745    tr_free( pieces );
746
747    return ret;
748}
749
750static int
751refillPulse( void * vtorrent )
752{
753    Torrent * t = vtorrent;
754    tr_torrent * tor = t->tor;
755    uint32_t i;
756    int peerCount;
757    tr_peer ** peers;
758    uint64_t blockCount;
759    uint64_t * blocks;
760
761    if( !t->isRunning )
762        return TRUE;
763    if( tr_torrentIsSeed( t->tor ) )
764        return TRUE;
765
766    torrentLock( t );
767    tordbg( t, "Refilling Request Buffers..." );
768
769    blocks = getPreferredBlocks( t, &blockCount );
770    peers = getConnectedPeers( t, &peerCount );
771
772    for( i=0; peerCount && i<blockCount; ++i )
773    {
774        const uint64_t block = blocks[i];
775        const uint32_t index = tr_torBlockPiece( tor, block );
776        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
777        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
778        int j;
779        assert( _tr_block( tor, index, begin ) == (int)block );
780        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
781        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
782
783        /* find a peer who can ask for this block */
784        for( j=0; j<peerCount; )
785        {
786            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
787            switch( val )
788            {
789                case TR_ADDREQ_FULL: 
790                case TR_ADDREQ_CLIENT_CHOKED:
791                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
792                    break;
793
794                case TR_ADDREQ_MISSING: 
795                case TR_ADDREQ_DUPLICATE: 
796                    ++j;
797                    break;
798
799                case TR_ADDREQ_OK:
800                    tr_bitfieldAdd( t->requested, block );
801                    j = peerCount;
802                    break;
803
804                default:
805                    assert( 0 && "unhandled value" );
806                    break;
807            }
808        }
809    }
810
811    /* cleanup */
812    tr_free( peers );
813    tr_free( blocks );
814
815    t->refillTimer = NULL;
816    torrentUnlock( t );
817    return FALSE;
818}
819
820static void
821broadcastClientHave( Torrent * t, uint32_t index )
822{
823    int i, size;
824    tr_peer ** peers;
825
826    assert( torrentIsLocked( t ) );
827
828    peers = getConnectedPeers( t, &size );
829    for( i=0; i<size; ++i )
830        tr_peerMsgsHave( peers[i]->msgs, index );
831    tr_free( peers );
832}
833
834static void
835broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
836{
837    int i, size;
838    tr_peer ** peers;
839
840    assert( torrentIsLocked( t ) );
841
842    peers = getConnectedPeers( t, &size );
843    for( i=0; i<size; ++i )
844        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
845    tr_free( peers );
846}
847
848static void
849addStrike( Torrent * t, tr_peer * peer )
850{
851    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
852
853    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
854    {
855        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
856        atom->myflags |= MYFLAG_BANNED;
857        peer->doPurge = 1;
858        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
859    }
860}
861
862static void
863msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
864{
865    tr_peer * peer = vpeer;
866    Torrent * t = (Torrent *) vt;
867    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
868
869    torrentLock( t );
870
871    switch( e->eventType )
872    {
873        case TR_PEERMSG_NEED_REQ:
874            if( t->refillTimer == NULL )
875                t->refillTimer = tr_timerNew( t->manager->handle,
876                                              refillPulse, t,
877                                              REFILL_PERIOD_MSEC );
878            break;
879
880        case TR_PEERMSG_CANCEL:
881            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
882            break;
883
884        case TR_PEERMSG_CLIENT_HAVE:
885            broadcastClientHave( t, e->pieceIndex );
886            tr_torrentRecheckCompleteness( t->tor );
887            break;
888
889        case TR_PEERMSG_PEER_PROGRESS: {
890            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
891            const int peerIsSeed = e->progress >= 1.0;
892            if( peerIsSeed ) {
893                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
894                atom->flags |= ADDED_F_SEED_FLAG;
895            } else {
896                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
897                atom->flags &= ~ADDED_F_SEED_FLAG;
898            } break;
899        }
900
901        case TR_PEERMSG_CLIENT_BLOCK:
902            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
903            break;
904
905        case TR_PEERMSG_GOT_ASSERT_ERROR:
906            addStrike( t, peer );
907            peer->doPurge = 1;
908            break;
909
910        case TR_PEERMSG_GOT_ERROR:
911            peer->doPurge = 1;
912            break;
913
914        default:
915            assert(0);
916    }
917
918    torrentUnlock( t );
919}
920
921static void
922ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
923{
924    if( getExistingAtom( t, addr ) == NULL )
925    {
926        struct peer_atom * a;
927        a = tr_new0( struct peer_atom, 1 );
928        a->addr = *addr;
929        a->port = port;
930        a->flags = flags;
931        a->from = from;
932        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
933        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
934    }
935}
936
937/* FIXME: this is kind of a mess. */
938static void
939myHandshakeDoneCB( tr_handshake    * handshake,
940                   tr_peerIo       * io,
941                   int               isConnected,
942                   const uint8_t   * peer_id,
943                   void            * vmanager )
944{
945    int ok = isConnected;
946    uint16_t port;
947    const struct in_addr * addr;
948    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
949    Torrent * t;
950    tr_handshake * ours;
951
952    assert( io != NULL );
953    assert( isConnected==0 || isConnected==1 );
954
955    t = tr_peerIoHasTorrentHash( io )
956        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
957        : NULL;
958
959    if( tr_peerIoIsIncoming ( io ) )
960        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
961                                        handshake, handshakeCompare );
962    else if( t != NULL )
963        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
964                                        handshake, handshakeCompare );
965    else
966        ours = handshake;
967
968    assert( ours != NULL );
969    assert( ours == handshake );
970
971    if( t != NULL )
972        torrentLock( t );
973
974    addr = tr_peerIoGetAddress( io, &port );
975
976    if( !ok || !t || !t->isRunning )
977    {
978        if( t ) {
979            struct peer_atom * atom = getExistingAtom( t, addr );
980            if( atom )
981                ++atom->numFails;
982        }
983
984        tr_peerIoFree( io );
985    }
986    else /* looking good */
987    {
988        struct peer_atom * atom;
989        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
990        atom = getExistingAtom( t, addr );
991
992        if( atom->myflags & MYFLAG_BANNED )
993        {
994            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
995            tr_peerIoFree( io );
996        }
997        else if( tr_ptrArraySize( t->peers ) >= MAX_CONNECTED_PEERS_PER_TORRENT )
998        {
999            tr_peerIoFree( io );
1000        }
1001        else
1002        {
1003            tr_peer * peer = getExistingPeer( t, addr );
1004
1005            if( peer != NULL ) /* we already have this peer */
1006            {
1007                tr_peerIoFree( io );
1008            }
1009            else
1010            {
1011                peer = getPeer( t, addr );
1012                tr_free( peer->client );
1013                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
1014                peer->port = port;
1015                peer->io = io;
1016                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1017                atom->time = time( NULL );
1018            }
1019        }
1020    }
1021
1022    if( t != NULL )
1023        torrentUnlock( t );
1024}
1025
1026void
1027tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1028                       struct in_addr  * addr,
1029                       uint16_t          port,
1030                       int               socket )
1031{
1032    managerLock( manager );
1033
1034    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1035    {
1036        tr_netClose( socket );
1037    }
1038    else /* we don't have a connetion to them yet... */
1039    {
1040        tr_peerIo * io;
1041        tr_handshake * handshake;
1042
1043        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1044
1045        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1046
1047        handshake = tr_handshakeNew( io,
1048                                     manager->handle->encryptionMode,
1049                                     myHandshakeDoneCB,
1050                                     manager );
1051
1052        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1053    }
1054
1055    managerUnlock( manager );
1056}
1057
1058void
1059tr_peerMgrAddPex( tr_peerMgr     * manager,
1060                  const uint8_t  * torrentHash,
1061                  uint8_t          from,
1062                  const tr_pex   * pex,
1063                  int              pexCount )
1064{
1065    Torrent * t;
1066    const tr_pex * end;
1067
1068    managerLock( manager );
1069
1070    t = getExistingTorrent( manager, torrentHash );
1071    for( end=pex+pexCount; pex!=end; ++pex )
1072        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1073
1074    managerUnlock( manager );
1075}
1076
1077void
1078tr_peerMgrAddPeers( tr_peerMgr    * manager,
1079                    const uint8_t * torrentHash,
1080                    uint8_t         from,
1081                    const uint8_t * peerCompact,
1082                    int             peerCount )
1083{
1084    int i;
1085    const uint8_t * walk = peerCompact;
1086    Torrent * t;
1087
1088    managerLock( manager );
1089
1090    t = getExistingTorrent( manager, torrentHash );
1091    for( i=0; t!=NULL && i<peerCount; ++i )
1092    {
1093        struct in_addr addr;
1094        uint16_t port;
1095        memcpy( &addr, walk, 4 ); walk += 4;
1096        memcpy( &port, walk, 2 ); walk += 2;
1097        ensureAtomExists( t, &addr, port, 0, from );
1098    }
1099
1100    managerUnlock( manager );
1101}
1102
1103/**
1104***
1105**/
1106
1107void
1108tr_peerMgrSetBlame( tr_peerMgr     * manager,
1109                    const uint8_t  * torrentHash,
1110                    int              pieceIndex,
1111                    int              success )
1112{
1113    if( !success )
1114    {
1115        int peerCount, i;
1116        Torrent * t = getExistingTorrent( manager, torrentHash );
1117        tr_peer ** peers;
1118
1119        assert( torrentIsLocked( t ) );
1120
1121        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1122        for( i=0; i<peerCount; ++i )
1123        {
1124            tr_peer * peer = peers[i];
1125            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1126            {
1127                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1128                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1129                           pieceIndex, (int)peer->strikes+1 );
1130                addStrike( t, peer );
1131            }
1132        }
1133    }
1134}
1135
1136int
1137tr_pexCompare( const void * va, const void * vb )
1138{
1139    const tr_pex * a = (const tr_pex *) va;
1140    const tr_pex * b = (const tr_pex *) vb;
1141    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1142    if( i ) return i;
1143    if( a->port < b->port ) return -1;
1144    if( a->port > b->port ) return 1;
1145    return 0;
1146}
1147
1148int tr_pexCompare( const void * a, const void * b );
1149
1150static int
1151peerPrefersCrypto( const tr_peer * peer )
1152{
1153    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1154        return TRUE;
1155
1156    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1157        return FALSE;
1158
1159    return tr_peerIoIsEncrypted( peer->io );
1160};
1161
1162int
1163tr_peerMgrGetPeers( tr_peerMgr      * manager,
1164                    const uint8_t   * torrentHash,
1165                    tr_pex         ** setme_pex )
1166{
1167    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1168    int i, peerCount;
1169    const tr_peer ** peers;
1170    tr_pex * pex;
1171    tr_pex * walk;
1172
1173    torrentLock( (Torrent*)t );
1174
1175    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1176    pex = walk = tr_new( tr_pex, peerCount );
1177
1178    for( i=0; i<peerCount; ++i, ++walk )
1179    {
1180        const tr_peer * peer = peers[i];
1181
1182        walk->in_addr = peer->in_addr;
1183
1184        walk->port = peer->port;
1185
1186        walk->flags = 0;
1187        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1188        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1189    }
1190
1191    assert( ( walk - pex ) == peerCount );
1192    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1193    *setme_pex = pex;
1194
1195    torrentUnlock( (Torrent*)t );
1196
1197    return peerCount;
1198}
1199
1200static int reconnectPulse( void * vtorrent );
1201static int rechokePulse( void * vtorrent );
1202static int swiftPulse( void * vtorrent );
1203
1204void
1205tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1206                        const uint8_t  * torrentHash )
1207{
1208    Torrent * t;
1209
1210    managerLock( manager );
1211
1212    t = getExistingTorrent( manager, torrentHash );
1213
1214    assert( t != NULL );
1215    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1216    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1217    assert( ( t->isRunning != 0 ) == ( t->swiftTimer != NULL ) );
1218
1219    if( !t->isRunning )
1220    {
1221        t->isRunning = 1;
1222
1223        t->reconnectTimer = tr_timerNew( t->manager->handle,
1224                                         reconnectPulse, t,
1225                                         RECONNECT_PERIOD_MSEC );
1226
1227        t->rechokeTimer = tr_timerNew( t->manager->handle,
1228                                       rechokePulse, t,
1229                                       RECHOKE_PERIOD_MSEC );
1230
1231        t->swiftTimer = tr_timerNew( t->manager->handle,
1232                                     swiftPulse, t,
1233                                     SWIFT_PERIOD_MSEC );
1234
1235        reconnectPulse( t );
1236
1237        rechokePulse( t );
1238
1239        swiftPulse( t );
1240    }
1241
1242    managerUnlock( manager );
1243}
1244
1245static void
1246stopTorrent( Torrent * t )
1247{
1248    assert( torrentIsLocked( t ) );
1249
1250    t->isRunning = 0;
1251    tr_timerFree( &t->rechokeTimer );
1252    tr_timerFree( &t->reconnectTimer );
1253    tr_timerFree( &t->swiftTimer );
1254
1255    /* disconnect the peers. */
1256    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1257    tr_ptrArrayClear( t->peers );
1258
1259    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1260     * which removes the handshake from t->outgoingHandshakes... */
1261    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1262        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1263}
1264void
1265tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1266                       const uint8_t  * torrentHash)
1267{
1268    managerLock( manager );
1269
1270    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1271
1272    managerUnlock( manager );
1273}
1274
1275void
1276tr_peerMgrAddTorrent( tr_peerMgr * manager,
1277                      tr_torrent * tor )
1278{
1279    Torrent * t;
1280
1281    managerLock( manager );
1282
1283    assert( tor != NULL );
1284    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1285
1286    t = torrentConstructor( manager, tor );
1287    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1288
1289    managerUnlock( manager );
1290}
1291
1292void
1293tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1294                         const uint8_t  * torrentHash )
1295{
1296    Torrent * t;
1297
1298    managerLock( manager );
1299
1300    t = getExistingTorrent( manager, torrentHash );
1301    assert( t != NULL );
1302    stopTorrent( t );
1303    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1304    torrentDestructor( t );
1305
1306    managerUnlock( manager );
1307}
1308
1309void
1310tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1311                               const uint8_t    * torrentHash,
1312                               int8_t           * tab,
1313                               int                tabCount )
1314{
1315    int i;
1316    const Torrent * t;
1317    const tr_torrent * tor;
1318    float interval;
1319
1320    managerLock( (tr_peerMgr*)manager );
1321
1322    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1323    tor = t->tor;
1324    interval = tor->info.pieceCount / (float)tabCount;
1325
1326    memset( tab, 0, tabCount );
1327
1328    for( i=0; i<tabCount; ++i )
1329    {
1330        const int piece = i * interval;
1331
1332        if( tor == NULL )
1333            tab[i] = 0;
1334        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1335            tab[i] = -1;
1336        else {
1337            int j, peerCount;
1338            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1339            for( j=0; j<peerCount; ++j )
1340                if( tr_bitfieldHas( peers[j]->have, i ) )
1341                    ++tab[i];
1342        }
1343    }
1344
1345    managerUnlock( (tr_peerMgr*)manager );
1346}
1347
1348/* Returns the pieces that we and/or a connected peer has */
1349tr_bitfield*
1350tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1351                        const uint8_t    * torrentHash )
1352{
1353    int i, size;
1354    const Torrent * t;
1355    const tr_peer ** peers;
1356    tr_bitfield * pieces;
1357
1358    managerLock( (tr_peerMgr*)manager );
1359
1360    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1361    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1362    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1363    for( i=0; i<size; ++i )
1364        if( peers[i]->io != NULL )
1365            tr_bitfieldOr( pieces, peers[i]->have );
1366
1367    managerUnlock( (tr_peerMgr*)manager );
1368    return pieces;
1369}
1370
1371int
1372tr_peerMgrHasConnections( const tr_peerMgr * manager,
1373                          const uint8_t    * torrentHash )
1374{
1375    int ret;
1376    const Torrent * t;
1377    managerLock( (tr_peerMgr*)manager );
1378
1379    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1380    ret = t && tr_ptrArraySize( t->peers );
1381
1382    managerUnlock( (tr_peerMgr*)manager );
1383    return ret;
1384}
1385
1386void
1387tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1388                        const uint8_t    * torrentHash,
1389                        int              * setmePeersKnown,
1390                        int              * setmePeersConnected,
1391                        int              * setmePeersSendingToUs,
1392                        int              * setmePeersGettingFromUs,
1393                        int              * setmePeersFrom )
1394{
1395    int i, size;
1396    const Torrent * t;
1397    const tr_peer ** peers;
1398
1399    managerLock( (tr_peerMgr*)manager );
1400
1401    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1402    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1403
1404    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1405    *setmePeersConnected      = 0;
1406    *setmePeersSendingToUs    = 0;
1407    *setmePeersGettingFromUs  = 0;
1408
1409    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1410        setmePeersFrom[i] = 0;
1411
1412    for( i=0; i<size; ++i )
1413    {
1414        const tr_peer * peer = peers[i];
1415        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1416
1417        if( peer->io == NULL ) /* not connected */
1418            continue;
1419
1420        ++*setmePeersConnected;
1421
1422        ++setmePeersFrom[atom->from];
1423
1424        if( peer->rateToPeer > 0.01 )
1425            ++*setmePeersGettingFromUs;
1426
1427        if( peer->rateToClient > 0.01 )
1428            ++*setmePeersSendingToUs;
1429    }
1430
1431    managerUnlock( (tr_peerMgr*)manager );
1432}
1433
1434struct tr_peer_stat *
1435tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1436                     const uint8_t     * torrentHash,
1437                     int               * setmeCount UNUSED )
1438{
1439    int i, size;
1440    const Torrent * t;
1441    tr_peer ** peers;
1442    tr_peer_stat * ret;
1443
1444    assert( manager != NULL );
1445    managerLock( (tr_peerMgr*)manager );
1446
1447    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1448    peers = getConnectedPeers( (Torrent*)t, &size );
1449    ret = tr_new0( tr_peer_stat, size );
1450
1451    for( i=0; i<size; ++i )
1452    {
1453        const tr_peer * peer = peers[i];
1454        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1455        tr_peer_stat * stat = ret + i;
1456
1457        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1458        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1459        stat->port             = peer->port;
1460        stat->from             = atom->from;
1461        stat->progress         = peer->progress;
1462        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1463        stat->uploadToRate     = peer->rateToPeer;
1464        stat->downloadFromRate = peer->rateToClient;
1465        stat->isDownloading    = stat->uploadToRate > 0.01;
1466        stat->isUploading      = stat->downloadFromRate > 0.01;
1467        stat->status           = peer->status;
1468    }
1469
1470    *setmeCount = size;
1471    tr_free( peers );
1472
1473    managerUnlock( (tr_peerMgr*)manager );
1474    return ret;
1475}
1476
1477/**
1478***
1479**/
1480
1481struct ChokeData
1482{
1483    tr_peer * peer;
1484    int rate;
1485    int preferred;
1486    int doUnchoke;
1487};
1488
1489static int
1490compareChoke( const void * va, const void * vb )
1491{
1492    const struct ChokeData * a = va;
1493    const struct ChokeData * b = vb;
1494
1495    /* primary key: larger speeds */
1496    if( a->rate > b->rate )
1497        return -1;
1498    if ( a->rate < b->rate )
1499        return 1;
1500
1501    /* secondary key: perferred peers */
1502    if( a->preferred != b->preferred )
1503        return a->preferred ? -1 : 1;
1504
1505    return 0;
1506}
1507
1508static int
1509clientIsSnubbedBy( const tr_peer * peer )
1510{
1511    assert( peer != NULL );
1512
1513    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1514}
1515
1516/**
1517***
1518**/
1519
1520static double
1521getWeightedThroughput( const tr_peer * peer, int clientIsSeed )
1522{
1523    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1524                                        : peer->rateToClient ) );
1525}
1526
1527static void
1528rechoke( Torrent * t )
1529{
1530    int i, peerCount, size=0, unchoked=0;
1531    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1532    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1533    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1534    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1535
1536    assert( torrentIsLocked( t ) );
1537   
1538    /* sort the peers by preference and rate */
1539    for( i=0; i<peerCount; ++i )
1540    {
1541        tr_peer * peer = peers[i];
1542        struct ChokeData * node;
1543        if( peer->chokeChangedAt > fibrillationTime ) {
1544            if( !peer->peerIsChoked )
1545                ++unchoked;
1546            continue;
1547        }
1548
1549        node = &choke[size++];
1550        node->peer = peer;
1551        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1552        node->rate = getWeightedThroughput( peer, clientIsSeed );
1553    }
1554
1555    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1556
1557    for( i=0; i<size && unchoked<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1558        choke[i].doUnchoke = 1;
1559        ++unchoked;
1560    }
1561
1562    for( ; i<size; ++i ) {
1563        ++unchoked;
1564        choke[i].doUnchoke = 1;
1565        if( choke[i].peer->peerIsInterested )
1566            break;
1567    }
1568
1569    for( i=0; i<size; ++i )
1570        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1571
1572    /* cleanup */
1573    tr_free( choke );
1574    tr_free( peers );
1575}
1576
1577static int
1578rechokePulse( void * vtorrent )
1579{
1580    Torrent * t = vtorrent;
1581    torrentLock( t );
1582    rechoke( t );
1583    torrentUnlock( t );
1584    return TRUE;
1585}
1586
1587/***
1588****
1589***/
1590
1591static int
1592swiftPulse( void * vtorrent )
1593{
1594    Torrent * t = vtorrent;
1595    torrentLock( t );
1596
1597    if( !tr_torrentIsSeed( t->tor ) )
1598    {
1599        int i;
1600        int peerCount = 0;
1601        int deadbeatCount = 0;
1602        tr_peer ** peers = getConnectedPeers( t, &peerCount );
1603        tr_peer ** deadbeats = tr_new( tr_peer*, peerCount );
1604
1605        const double ul_KiBsec = tr_rcRate( t->tor->upload );
1606        const double ul_KiB = ul_KiBsec * (SWIFT_PERIOD_MSEC/1000.0);
1607        const double ul_bytes = ul_KiB * 1024;
1608        const double freeCreditTotal = ul_bytes * SWIFT_LARGESSE;
1609        int freeCreditPerPeer;
1610
1611        for( i=0; i<peerCount; ++i ) {
1612            tr_peer * peer = peers[i];
1613            if( peer->credit <= 0 )
1614                deadbeats[deadbeatCount++] =  peer;
1615        }
1616
1617        freeCreditPerPeer = (int)( freeCreditTotal / deadbeatCount );
1618        for( i=0; i<deadbeatCount; ++i )
1619            deadbeats[i]->credit = freeCreditPerPeer;
1620
1621        tordbg( t, "%d deadbeats, "
1622            "who are each being granted %d bytes' credit "
1623            "for a total of %.1f KiB, "
1624            "%d%% of the torrent's ul speed %.1f\n",
1625            deadbeatCount, freeCreditPerPeer,
1626            ul_KiBsec*SWIFT_LARGESSE, (int)(SWIFT_LARGESSE*100), ul_KiBsec );
1627
1628        tr_free( deadbeats );
1629        tr_free( peers );
1630    }
1631
1632    torrentUnlock( t );
1633    return TRUE;
1634}
1635
1636/***
1637****
1638****  Life and Death
1639****
1640***/
1641
1642static int
1643shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1644{
1645    const tr_torrent * tor = t->tor;
1646    const time_t now = time( NULL );
1647    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1648
1649    /* if it's marked for purging, close it */
1650    if( peer->doPurge ) {
1651        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1652        return TRUE;
1653    }
1654
1655    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1656    if( 1 ) {
1657        const int clientIsSeed = tr_torrentIsSeed( tor );
1658        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1659        if( peerIsSeed && clientIsSeed && ( !tr_torrentIsPexEnabled(tor) || (now-atom->time>=30) ) ) {
1660            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1661            return TRUE;
1662        }
1663    }
1664
1665    /* disconnect if it's been too long since piece data has been transferred.
1666     * this is on a sliding scale based on number of available peers... */
1667    if( 1 ) {
1668        const int relaxStrictnessIfFewerThanN = (int)((MAX_CONNECTED_PEERS_PER_TORRENT * 0.9) + 0.5);
1669        /* if we have >= relaxIfFewerThan, strictness is 100%.
1670         * if we have zero connections, strictness is 0% */
1671        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1672            ? 1.0
1673            : peerCount / (double)relaxStrictnessIfFewerThanN;
1674        const int lo = MIN_UPLOAD_IDLE_SECS;
1675        const int hi = MAX_UPLOAD_IDLE_SECS;
1676        const int limit = lo + ((hi-lo) * strictness);
1677        const time_t then = peer->pieceDataActivityDate;
1678        const int idleTime = then ? (now-then) : 0;
1679        if( idleTime > limit ) {
1680            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1681                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1682            return TRUE;
1683        }
1684    }
1685
1686    return FALSE;
1687}
1688
1689static tr_peer **
1690getPeersToClose( Torrent * t, int * setmeSize )
1691{
1692    int i, peerCount, outsize;
1693    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1694    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1695
1696    assert( torrentIsLocked( t ) );
1697
1698    for( i=outsize=0; i<peerCount; ++i )
1699        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1700            ret[outsize++] = peers[i];
1701
1702    *setmeSize = outsize;
1703    return ret;
1704}
1705
1706static int
1707compareCandidates( const void * va, const void * vb )
1708{
1709    const struct peer_atom * a = * (const struct peer_atom**) va;
1710    const struct peer_atom * b = * (const struct peer_atom**) vb;
1711    int i;
1712
1713    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1714        return i;
1715
1716    if( a->time != b->time )
1717        return a->time < b->time ? -1 : 1;
1718
1719    return 0;
1720}
1721
1722static struct peer_atom **
1723getPeerCandidates( Torrent * t, int * setmeSize )
1724{
1725    int i, atomCount, retCount;
1726    struct peer_atom ** atoms;
1727    struct peer_atom ** ret;
1728    const time_t now = time( NULL );
1729    const int seed = tr_torrentIsSeed( t->tor );
1730
1731    assert( torrentIsLocked( t ) );
1732
1733    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1734    ret = tr_new( struct peer_atom*, atomCount );
1735    for( i=retCount=0; i<atomCount; ++i )
1736    {
1737        int wait, minWait, maxWait;
1738        struct peer_atom * atom = atoms[i];
1739
1740        /* peer fed us too much bad data ... we only keep it around
1741         * now to weed it out in case someone sends it to us via pex */
1742        if( atom->myflags & MYFLAG_BANNED ) {
1743#if 0
1744            tordbg( t, "RECONNECT peer %d (%s) is banned...",
1745                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1746#endif
1747            continue;
1748        }
1749
1750        /* we don't need two connections to the same peer... */
1751        if( peerIsInUse( t, &atom->addr ) ) {
1752#if 0
1753            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1754                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1755#endif
1756            continue;
1757        }
1758
1759        /* no need to connect if we're both seeds... */
1760        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1761#if 0
1762            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1763                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1764#endif
1765            continue;
1766        }
1767
1768        /* we're wasting our time trying to connect to this bozo. */
1769        if( atom->numFails > 10 ) {
1770#if 0
1771            tordbg( t, "RECONNECT peer %d (%s) gives us nothing but failure.",
1772                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1773#endif
1774            continue;
1775        }
1776
1777        /* if we used this peer recently, give someone else a turn */
1778        minWait = 10; /* ten seconds */
1779        maxWait = (60 * 20); /* twenty minutes */
1780        wait = atom->numFails * 30; /* add 15 secs to the wait interval for each consecutive failure*/
1781        if( wait < minWait ) wait = minWait;
1782        if( wait > maxWait ) wait = maxWait;
1783        if( ( now - atom->time ) < wait ) {
1784            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1785                    i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1786            continue;
1787        }
1788
1789        ret[retCount++] = atom;
1790    }
1791
1792    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1793    *setmeSize = retCount;
1794    return ret;
1795}
1796
1797static int
1798reconnectPulse( void * vtorrent )
1799{
1800    Torrent * t = vtorrent;
1801
1802    torrentLock( t );
1803
1804    if( !t->isRunning )
1805    {
1806        removeAllPeers( t );
1807    }
1808    else
1809    {
1810        int i, nCandidates, nBad;
1811        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1812        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1813
1814        if( nBad || nCandidates )
1815            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1816                       "%d connection candidates, %d atoms, max per pulse is %d",
1817                       t->tor->info.name, nBad, nCandidates,
1818                       tr_ptrArraySize(t->pool),
1819                       (int)MAX_RECONNECTIONS_PER_PULSE );
1820
1821        /* disconnect some peers.
1822           if we got transferred piece data, then they might be good peers,
1823           so reset their `numFails' weight to zero.  otherwise we connected
1824           to them fruitlessly, so mark it as another fail */
1825        for( i=0; i<nBad; ++i ) {
1826            tr_peer * peer = connections[i];
1827            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1828            if( peer->pieceDataActivityDate )
1829                atom->numFails = 0;
1830            else
1831                ++atom->numFails;
1832            removePeer( t, peer );
1833        }
1834
1835        /* add some new ones */
1836        for( i=0; i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1837        {
1838            tr_peerMgr * mgr = t->manager;
1839            struct peer_atom * atom = candidates[i];
1840            tr_peerIo * io;
1841            tr_handshake * handshake;
1842
1843            tordbg( t, "Starting an OUTGOING connection with %s",
1844                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1845
1846            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1847
1848            handshake = tr_handshakeNew( io,
1849                                         mgr->handle->encryptionMode,
1850                                         myHandshakeDoneCB,
1851                                         mgr );
1852
1853            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1854
1855            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1856
1857            atom->time = time( NULL );
1858        }
1859
1860        /* cleanup */
1861        tr_free( connections );
1862        tr_free( candidates );
1863    }
1864
1865    torrentUnlock( t );
1866    return TRUE;
1867}
Note: See TracBrowser for help on using the repository browser.