source: trunk/libtransmission/peer-mgr.c @ 3819

Last change on this file since 3819 was 3819, checked in by charles, 15 years ago

since many people seem to be having a hard time holding onto good peers, be a little more lenient on how quickly we hang up on peers that have transferred piece data with us.

  • Property svn:keywords set to Date Rev Author Id
File size: 49.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3819 2007-11-14 05:02:03Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20#include <arpa/inet.h> /* inet_ntoa */
21
22#include <event.h>
23
24#include "transmission.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "platform.h"
35#include "ptrarray.h"
36#include "ratecontrol.h"
37#include "shared.h"
38#include "trevent.h"
39#include "utils.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (1000),
45
46    /* how frequently to decide which peers live and die */
47    RECONNECT_PERIOD_MSEC = (5 * 1000),
48
49    /* how frequently to refill peers' request lists */
50    REFILL_PERIOD_MSEC = 666,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* following the BT spec, we consider ourselves `snubbed' if
56     * we're we don't get piece data from a peer in this long */
57    SNUBBED_SEC = 60,
58
59    /* arbitrary */
60    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
61
62    /* when many peers are available, keep idle ones this long */
63    MIN_UPLOAD_IDLE_SECS = (60 * 3),
64
65    /* when few peers are available, keep idle ones this long */
66    MAX_UPLOAD_IDLE_SECS = (60 * 10),
67
68    /* how many peers to unchoke per-torrent. */
69    /* FIXME: make this user-configurable? */
70    NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
71
72    /* set this too high and there will be a lot of churn.
73     * set it too low and you'll get peers too slowly */
74    MAX_RECONNECTIONS_PER_PULSE = 5,
75
76    /* corresponds to ut_pex's added.f flags */
77    ADDED_F_ENCRYPTION_FLAG = 1,
78    /* corresponds to ut_pex's added.f flags */
79    ADDED_F_SEED_FLAG = 2,
80
81    /* number of bad pieces a peer is allowed to send before we ban them */
82    MAX_BAD_PIECES_PER_PEER = 3,
83    /* use for bitwise operations w/peer_atom.myflags */
84    MYFLAG_BANNED = 1
85};
86
87
88/**
89***
90**/
91
92/* We keep one of these for every peer we know about, whether
93 * it's connected or not, so the struct must be small.
94 * When our current connections underperform, we dip back
95 * into this list for new ones. */
96struct peer_atom
97{   
98    uint8_t from;
99    uint8_t flags; /* these match the added_f flags */
100    uint8_t myflags; /* flags that aren't defined in added_f */
101    uint16_t port;
102    uint16_t numFails;
103    struct in_addr addr;
104    time_t time;
105};
106
107typedef struct
108{
109    uint8_t hash[SHA_DIGEST_LENGTH];
110    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
111    tr_ptrArray * pool; /* struct peer_atom */
112    tr_ptrArray * peers; /* tr_peer */
113    tr_timer * reconnectTimer;
114    tr_timer * rechokeTimer;
115    tr_timer * refillTimer;
116    tr_torrent * tor;
117    tr_bitfield * requested;
118
119    unsigned int isRunning : 1;
120
121    struct tr_peerMgr * manager;
122}
123Torrent;
124
125struct tr_peerMgr
126{
127    tr_handle * handle;
128    tr_ptrArray * torrents; /* Torrent */
129    tr_ptrArray * incomingHandshakes; /* tr_handshake */
130};
131
132/**
133***
134**/
135
136static void
137myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
138{
139    FILE * fp = tr_getLog( );
140    if( fp != NULL )
141    {
142        va_list args;
143        char timestr[64];
144        struct evbuffer * buf = evbuffer_new( );
145        char * myfile = tr_strdup( file );
146
147        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
148        if( t != NULL )
149            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
150        va_start( args, fmt );
151        evbuffer_add_vprintf( buf, fmt, args );
152        va_end( args );
153        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
154        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
155
156        tr_free( myfile );
157        evbuffer_free( buf );
158    }
159}
160
161#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
162
163/**
164***
165**/
166
167static void
168managerLock( struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->handle );
171}
172static void
173managerUnlock( struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->handle );
176}
177static void
178torrentLock( Torrent * torrent )
179{
180    managerLock( torrent->manager );
181}
182static void
183torrentUnlock( Torrent * torrent )
184{
185    managerUnlock( torrent->manager );
186}
187static int
188torrentIsLocked( const Torrent * t )
189{
190    return tr_globalIsLocked( t->manager->handle );
191}
192
193/**
194***
195**/
196
197static int
198compareAddresses( const struct in_addr * a, const struct in_addr * b )
199{
200    return tr_compareUint32( a->s_addr, b->s_addr );
201}
202
203static int
204handshakeCompareToAddr( const void * va, const void * vb )
205{
206    const tr_handshake * a = va;
207    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
208}
209
210static int
211handshakeCompare( const void * a, const void * b )
212{
213    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
214}
215
216static tr_handshake*
217getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
218{
219    return tr_ptrArrayFindSorted( handshakes,
220                                  in_addr,
221                                  handshakeCompareToAddr );
222}
223
224static int
225comparePeerAtomToAddress( const void * va, const void * vb )
226{
227    const struct peer_atom * a = va;
228    return compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va, const void * vb )
233{
234    const struct peer_atom * b = vb;
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va, const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
248}
249
250static int
251torrentCompareToHash( const void * va, const void * vb )
252{
253    const Torrent * a = va;
254    const uint8_t * b_hash = vb;
255    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
256}
257
258static Torrent*
259getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
260{
261    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
262                                             hash,
263                                             torrentCompareToHash );
264}
265
266static int
267peerCompare( const void * va, const void * vb )
268{
269    const tr_peer * a = va;
270    const tr_peer * b = vb;
271    return compareAddresses( &a->in_addr, &b->in_addr );
272}
273
274static int
275peerCompareToAddr( const void * va, const void * vb )
276{
277    const tr_peer * a = va;
278    return compareAddresses( &a->in_addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
283{
284    assert( torrentIsLocked( torrent ) );
285    assert( in_addr != NULL );
286
287    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
288                                             in_addr,
289                                             peerCompareToAddr );
290}
291
292static struct peer_atom*
293getExistingAtom( const Torrent * t, const struct in_addr * addr )
294{
295    assert( torrentIsLocked( t ) );
296    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
297}
298
299static int
300peerIsInUse( const Torrent * ct, const struct in_addr * addr )
301{
302    Torrent * t = (Torrent*) ct;
303
304    assert( torrentIsLocked ( t ) );
305
306    return getExistingPeer( t, addr )
307        || getExistingHandshake( t->outgoingHandshakes, addr )
308        || getExistingHandshake( t->manager->incomingHandshakes, addr );
309}
310
311static tr_peer*
312peerConstructor( const struct in_addr * in_addr )
313{
314    tr_peer * p;
315    p = tr_new0( tr_peer, 1 );
316    p->rcToClient = tr_rcInit( );
317    p->rcToPeer = tr_rcInit( );
318    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent * torrent, const struct in_addr * in_addr )
324{
325    tr_peer * peer;
326
327    assert( torrentIsLocked( torrent ) );
328
329    peer = getExistingPeer( torrent, in_addr );
330
331    if( peer == NULL ) {
332        peer = peerConstructor( in_addr );
333        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
334    }
335
336    return peer;
337}
338
339static void
340peerDestructor( tr_peer * peer )
341{
342    assert( peer != NULL );
343    assert( peer->msgs != NULL );
344
345    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
346    tr_peerMsgsFree( peer->msgs );
347
348    tr_peerIoFree( peer->io );
349
350    tr_bitfieldFree( peer->have );
351    tr_bitfieldFree( peer->blame );
352    tr_rcClose( peer->rcToClient );
353    tr_rcClose( peer->rcToPeer );
354    tr_free( peer->client );
355    tr_free( peer );
356}
357
358static void
359removePeer( Torrent * t, tr_peer * peer )
360{
361    tr_peer * removed;
362    struct peer_atom * atom;
363
364    assert( torrentIsLocked( t ) );
365
366    atom = getExistingAtom( t, &peer->in_addr );
367    assert( atom != NULL );
368    atom->time = time( NULL );
369
370    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
371    assert( removed == peer );
372    peerDestructor( removed );
373}
374
375static void
376removeAllPeers( Torrent * t )
377{
378    while( !tr_ptrArrayEmpty( t->peers ) )
379        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
380}
381
382static void
383torrentDestructor( Torrent * t )
384{
385    uint8_t hash[SHA_DIGEST_LENGTH];
386
387    assert( t != NULL );
388    assert( !t->isRunning );
389    assert( t->peers != NULL );
390    assert( torrentIsLocked( t ) );
391    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
392    assert( tr_ptrArrayEmpty( t->peers ) );
393
394    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
395
396    tr_timerFree( &t->reconnectTimer );
397    tr_timerFree( &t->rechokeTimer );
398    tr_timerFree( &t->refillTimer );
399
400    tr_bitfieldFree( t->requested );
401    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
402    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
403    tr_ptrArrayFree( t->peers, NULL );
404
405    tr_free( t );
406}
407
408static Torrent*
409torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
410{
411    Torrent * t;
412
413    t = tr_new0( Torrent, 1 );
414    t->manager = manager;
415    t->tor = tor;
416    t->pool = tr_ptrArrayNew( );
417    t->peers = tr_ptrArrayNew( );
418    t->outgoingHandshakes = tr_ptrArrayNew( );
419    t->requested = tr_bitfieldNew( tor->blockCount );
420    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
421
422    return t;
423}
424
425/**
426***
427**/
428
429struct tr_bitfield *
430tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
431                              const uint32_t         pieceCount,
432                              const uint8_t          infohash[20],
433                              const struct in_addr * ip )
434{
435    /* This has been checked against the spec example implementation. Feeding it with :
436    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
437generate :
438    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
439    but since we're storing in a bitfield, it won't be in this order... */
440    /* TODO : We should translate link-local IPv4 adresses to external IP,
441     * so that being on same local network gives us the same allowed pieces */
442   
443    uint8_t *seed;
444    char buf[4];
445    uint32_t allowedPieceCount = 0;
446    tr_bitfield_t * ret;
447
448#if 0
449    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
450    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
451            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
452            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
453            inet_ntoa( *ip ) );
454#endif
455   
456    seed = malloc(4 + SHA_DIGEST_LENGTH);
457   
458    ret = tr_bitfieldNew( pieceCount );
459   
460    /* We need a seed based on most significant bytes of peer address
461        concatenated with torrent's infohash */
462    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
463   
464    memcpy( seed, &buf, 4 );
465    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
466   
467    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
468   
469    while ( allowedPieceCount < setCount )
470    {
471        int i;
472        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
473        {
474            /* We generate indices from 4-byte chunks of the seed */
475            uint32_t j = i * 4;
476            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
477            uint32_t index = y % pieceCount;
478           
479            if ( !tr_bitfieldHas( ret, index ) )
480            {
481                tr_bitfieldAdd( ret, index );
482                allowedPieceCount++;
483            }
484        }
485        /* We randomize the seed, in case we need to iterate more */
486        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
487    }
488    tr_free( seed );
489   
490    return ret;
491}
492
493tr_peerMgr*
494tr_peerMgrNew( tr_handle * handle )
495{
496    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
497    m->handle = handle;
498    m->torrents = tr_ptrArrayNew( );
499    m->incomingHandshakes = tr_ptrArrayNew( );
500    return m;
501}
502
503void
504tr_peerMgrFree( tr_peerMgr * manager )
505{
506    managerLock( manager );
507
508    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
509     * the item from manager->handshakes, so this is a little roundabout... */
510    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
511        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
512    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
513
514    /* free the torrents. */
515    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
516
517    managerUnlock( manager );
518    tr_free( manager );
519}
520
521static tr_peer**
522getConnectedPeers( Torrent * t, int * setmeCount )
523{
524    int i, peerCount, connectionCount;
525    tr_peer **peers;
526    tr_peer **ret;
527
528    assert( torrentIsLocked( t ) );
529
530    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
531    ret = tr_new( tr_peer*, peerCount );
532
533    for( i=connectionCount=0; i<peerCount; ++i )
534        if( peers[i]->msgs != NULL )
535            ret[connectionCount++] = peers[i];
536
537    *setmeCount = connectionCount;
538    return ret;
539}
540
541/***
542****  Refill
543***/
544
545struct tr_refill_piece
546{
547    tr_priority_t priority;
548    int percentDone;
549    uint16_t random;
550    uint32_t piece;
551    uint32_t peerCount;
552    uint32_t fastAllowed;
553    uint32_t suggested;
554};
555
556static int
557compareRefillPiece (const void * aIn, const void * bIn)
558{
559    const struct tr_refill_piece * a = aIn;
560    const struct tr_refill_piece * b = bIn;
561   
562    /* if one *might be* fastallowed to us, get it first...
563     * I'm putting it on top so we prioritize those pieces at
564     * startup, then we'll have them, and we'll be denied access
565     * to them */
566    if (a->fastAllowed != b->fastAllowed)
567        return a->fastAllowed < b->fastAllowed ? -1 : 1;
568   
569    /* if one piece has a higher priority, it goes first */
570    if (a->priority != b->priority)
571        return a->priority > b->priority ? -1 : 1;
572   
573    /* otherwise if one was suggested to us, get it */
574    if (a->suggested != b->suggested)
575        return a->suggested < b->suggested ? -1 : 1;
576
577    /* try to fill partial pieces */
578    if( a->percentDone != b->percentDone )
579        return a->percentDone > b->percentDone ? -1 : 1;
580   
581    /* otherwise if one has fewer peers, it goes first */
582    if (a->peerCount != b->peerCount)
583        return a->peerCount < b->peerCount ? -1 : 1;
584
585    /* otherwise go with our random seed */
586    return tr_compareUint16( a->random, b->random );
587}
588
589static int
590isPieceInteresting( const tr_torrent  * tor,
591                    int                 piece )
592{
593    if( tor->info.pieces[piece].dnd ) /* we don't want it */
594        return 0;
595
596    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
597        return 0;
598
599    return 1;
600}
601
602static uint32_t*
603getPreferredPieces( Torrent     * t,
604                    uint32_t    * pieceCount )
605{
606    const tr_torrent * tor = t->tor;
607    const tr_info * inf = &tor->info;
608    int i;
609    uint32_t poolSize = 0;
610    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
611    int peerCount;
612    tr_peer** peers;
613
614    assert( torrentIsLocked( t ) );
615
616    peers = getConnectedPeers( t, &peerCount );
617
618    for( i=0; i<inf->pieceCount; ++i )
619        if( isPieceInteresting( tor, i ) )
620            pool[poolSize++] = i;
621
622    /* sort the pool from most interesting to least... */
623    if( poolSize > 1 )
624    {
625        uint32_t j;
626        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
627
628        for( j=0; j<poolSize; ++j )
629        {
630            int k;
631            const int piece = pool[j];
632            struct tr_refill_piece * setme = p + j;
633
634            setme->piece = piece;
635            setme->priority = inf->pieces[piece].priority;
636            setme->peerCount = 0;
637            setme->fastAllowed = 0;
638            setme->random = tr_rand( UINT16_MAX );
639            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
640
641            for( k=0; k<peerCount; ++k ) {
642                const tr_peer * peer = peers[k];
643                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
644                    ++setme->peerCount;
645                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
646                    but we're guaranteed to get the same pieces from different peers,
647                    so we'll build a list and pray one actually have this one */
648                setme->fastAllowed = tr_peerMsgsIsPieceFastAllowed( peer->msgs, i );
649                /* Also, if someone SUGGESTed a piece to us, prioritize it over non-suggested others
650                 */
651                setme->suggested   = tr_peerMsgsIsPieceSuggested( peer->msgs, i );
652            }
653        }
654
655        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
656
657        for( j=0; j<poolSize; ++j )
658            pool[j] = p[j].piece;
659
660        tr_free( p );
661    }
662
663    tr_free( peers );
664
665    *pieceCount = poolSize;
666    return pool;
667}
668
669static uint64_t*
670getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
671{
672    uint32_t i;
673    uint32_t pieceCount;
674    uint32_t * pieces;
675    uint64_t *req, *unreq, *ret, *walk;
676    int reqCount, unreqCount;
677    const tr_torrent * tor = t->tor;
678
679    assert( torrentIsLocked( t ) );
680
681    pieces = getPreferredPieces( t, &pieceCount );
682
683    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
684    reqCount = 0;
685    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
686    unreqCount = 0;
687
688    for( i=0; i<pieceCount; ++i ) {
689        const uint32_t index = pieces[i];
690        const int begin = tr_torPieceFirstBlock( tor, index );
691        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
692        int block;
693        for( block=begin; block<end; ++block )
694            if( tr_cpBlockIsComplete( tor->completion, block ) )
695                continue;
696            else if( tr_bitfieldHas( t->requested, block ) )
697                req[reqCount++] = block;
698            else
699                unreq[unreqCount++] = block;
700    }
701
702    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
703    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
704    walk += unreqCount;
705    memcpy( walk, req, sizeof(uint64_t) * reqCount );
706    walk += reqCount;
707    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
708    *setmeCount = walk - ret;
709
710    tr_free( req );
711    tr_free( unreq );
712    tr_free( pieces );
713
714    return ret;
715}
716
717static int
718refillPulse( void * vtorrent )
719{
720    Torrent * t = vtorrent;
721    tr_torrent * tor = t->tor;
722    uint32_t i;
723    int peerCount;
724    tr_peer ** peers;
725    uint64_t blockCount;
726    uint64_t * blocks;
727
728    if( !t->isRunning )
729        return TRUE;
730    if( tr_torrentIsSeed( t->tor ) )
731        return TRUE;
732
733    torrentLock( t );
734    tordbg( t, "Refilling Request Buffers..." );
735
736    blocks = getPreferredBlocks( t, &blockCount );
737    peers = getConnectedPeers( t, &peerCount );
738
739    for( i=0; peerCount && i<blockCount; ++i )
740    {
741        const uint64_t block = blocks[i];
742        const uint32_t index = tr_torBlockPiece( tor, block );
743        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
744        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
745        int j;
746        assert( _tr_block( tor, index, begin ) == (int)block );
747        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
748        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
749
750        /* find a peer who can ask for this block */
751        for( j=0; j<peerCount; )
752        {
753            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
754            switch( val )
755            {
756                case TR_ADDREQ_FULL: 
757                case TR_ADDREQ_CLIENT_CHOKED:
758                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
759                    break;
760
761                case TR_ADDREQ_MISSING: 
762                case TR_ADDREQ_DUPLICATE: 
763                    ++j;
764                    break;
765
766                case TR_ADDREQ_OK:
767                    tr_bitfieldAdd( t->requested, block );
768                    j = peerCount;
769                    break;
770
771                default:
772                    assert( 0 && "unhandled value" );
773                    break;
774            }
775        }
776    }
777
778    /* cleanup */
779    tr_free( peers );
780    tr_free( blocks );
781
782    t->refillTimer = NULL;
783    torrentUnlock( t );
784    return FALSE;
785}
786
787static void
788broadcastClientHave( Torrent * t, uint32_t index )
789{
790    int i, size;
791    tr_peer ** peers;
792
793    assert( torrentIsLocked( t ) );
794
795    peers = getConnectedPeers( t, &size );
796    for( i=0; i<size; ++i )
797        tr_peerMsgsHave( peers[i]->msgs, index );
798    tr_free( peers );
799}
800
801static void
802broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
803{
804    int i, size;
805    tr_peer ** peers;
806
807    assert( torrentIsLocked( t ) );
808
809    peers = getConnectedPeers( t, &size );
810    for( i=0; i<size; ++i )
811        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
812    tr_free( peers );
813}
814
815static void
816addStrike( Torrent * t, tr_peer * peer )
817{
818    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
819
820    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
821    {
822        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
823        atom->myflags |= MYFLAG_BANNED;
824        peer->doPurge = 1;
825        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
826    }
827}
828
829static void
830msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
831{
832    tr_peer * peer = vpeer;
833    Torrent * t = (Torrent *) vt;
834    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
835
836    torrentLock( t );
837
838    switch( e->eventType )
839    {
840        case TR_PEERMSG_NEED_REQ:
841            if( t->refillTimer == NULL )
842                t->refillTimer = tr_timerNew( t->manager->handle,
843                                              refillPulse, t,
844                                              REFILL_PERIOD_MSEC );
845            break;
846
847        case TR_PEERMSG_CANCEL:
848            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
849            break;
850
851        case TR_PEERMSG_CLIENT_HAVE:
852            broadcastClientHave( t, e->pieceIndex );
853            tr_torrentRecheckCompleteness( t->tor );
854            break;
855
856        case TR_PEERMSG_PEER_PROGRESS: {
857            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
858            const int peerIsSeed = e->progress >= 1.0;
859            if( peerIsSeed ) {
860                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
861                atom->flags |= ADDED_F_SEED_FLAG;
862            } else {
863                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
864                atom->flags &= ~ADDED_F_SEED_FLAG;
865            } break;
866        }
867
868        case TR_PEERMSG_CLIENT_BLOCK:
869            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
870            break;
871
872        case TR_PEERMSG_GOT_ASSERT_ERROR:
873            addStrike( t, peer );
874            peer->doPurge = 1;
875            break;
876
877        case TR_PEERMSG_GOT_ERROR:
878            peer->doPurge = 1;
879            break;
880
881        default:
882            assert(0);
883    }
884
885    torrentUnlock( t );
886}
887
888static void
889ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
890{
891    if( getExistingAtom( t, addr ) == NULL )
892    {
893        struct peer_atom * a;
894        a = tr_new0( struct peer_atom, 1 );
895        a->addr = *addr;
896        a->port = port;
897        a->flags = flags;
898        a->from = from;
899        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
900        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
901    }
902}
903
904/* FIXME: this is kind of a mess. */
905static void
906myHandshakeDoneCB( tr_handshake    * handshake,
907                   tr_peerIo       * io,
908                   int               isConnected,
909                   const uint8_t   * peer_id,
910                   void            * vmanager )
911{
912    int ok = isConnected;
913    uint16_t port;
914    const struct in_addr * addr;
915    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
916    Torrent * t;
917    tr_handshake * ours;
918
919    assert( io != NULL );
920    assert( isConnected==0 || isConnected==1 );
921
922    t = tr_peerIoHasTorrentHash( io )
923        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
924        : NULL;
925
926    if( tr_peerIoIsIncoming ( io ) )
927        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
928                                        handshake, handshakeCompare );
929    else if( t != NULL )
930        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
931                                        handshake, handshakeCompare );
932    else
933        ours = handshake;
934
935    assert( ours != NULL );
936    assert( ours == handshake );
937
938    if( t != NULL )
939        torrentLock( t );
940
941    addr = tr_peerIoGetAddress( io, &port );
942
943    if( !ok || !t || !t->isRunning )
944    {
945        if( t ) {
946            struct peer_atom * atom = getExistingAtom( t, addr );
947            if( atom )
948                ++atom->numFails;
949        }
950
951        tr_peerIoFree( io );
952    }
953    else /* looking good */
954    {
955        struct peer_atom * atom;
956        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
957        atom = getExistingAtom( t, addr );
958
959        if( atom->myflags & MYFLAG_BANNED )
960        {
961            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
962            tr_peerIoFree( io );
963        }
964        else if( tr_ptrArraySize( t->peers ) >= MAX_CONNECTED_PEERS_PER_TORRENT )
965        {
966            tr_peerIoFree( io );
967        }
968        else
969        {
970            tr_peer * peer = getExistingPeer( t, addr );
971
972            if( peer != NULL ) /* we already have this peer */
973            {
974                tr_peerIoFree( io );
975            }
976            else
977            {
978                peer = getPeer( t, addr );
979                tr_free( peer->client );
980                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
981                peer->port = port;
982                peer->io = io;
983                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
984                atom->time = time( NULL );
985            }
986        }
987    }
988
989    if( t != NULL )
990        torrentUnlock( t );
991}
992
993void
994tr_peerMgrAddIncoming( tr_peerMgr      * manager,
995                       struct in_addr  * addr,
996                       uint16_t          port,
997                       int               socket )
998{
999    managerLock( manager );
1000
1001    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1002    {
1003        tr_netClose( socket );
1004    }
1005    else /* we don't have a connetion to them yet... */
1006    {
1007        tr_peerIo * io;
1008        tr_handshake * handshake;
1009
1010        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1011
1012        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1013
1014        handshake = tr_handshakeNew( io,
1015                                     manager->handle->encryptionMode,
1016                                     myHandshakeDoneCB,
1017                                     manager );
1018
1019        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1020    }
1021
1022    managerUnlock( manager );
1023}
1024
1025void
1026tr_peerMgrAddPex( tr_peerMgr     * manager,
1027                  const uint8_t  * torrentHash,
1028                  uint8_t          from,
1029                  const tr_pex   * pex,
1030                  int              pexCount )
1031{
1032    Torrent * t;
1033    const tr_pex * end;
1034
1035    managerLock( manager );
1036
1037    t = getExistingTorrent( manager, torrentHash );
1038    for( end=pex+pexCount; pex!=end; ++pex )
1039        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1040
1041    managerUnlock( manager );
1042}
1043
1044void
1045tr_peerMgrAddPeers( tr_peerMgr    * manager,
1046                    const uint8_t * torrentHash,
1047                    uint8_t         from,
1048                    const uint8_t * peerCompact,
1049                    int             peerCount )
1050{
1051    int i;
1052    const uint8_t * walk = peerCompact;
1053    Torrent * t;
1054
1055    managerLock( manager );
1056
1057    t = getExistingTorrent( manager, torrentHash );
1058    for( i=0; t!=NULL && i<peerCount; ++i )
1059    {
1060        struct in_addr addr;
1061        uint16_t port;
1062        memcpy( &addr, walk, 4 ); walk += 4;
1063        memcpy( &port, walk, 2 ); walk += 2;
1064        ensureAtomExists( t, &addr, port, 0, from );
1065    }
1066
1067    managerUnlock( manager );
1068}
1069
1070/**
1071***
1072**/
1073
1074void
1075tr_peerMgrSetBlame( tr_peerMgr     * manager,
1076                    const uint8_t  * torrentHash,
1077                    int              pieceIndex,
1078                    int              success )
1079{
1080    if( !success )
1081    {
1082        int peerCount, i;
1083        Torrent * t = getExistingTorrent( manager, torrentHash );
1084        tr_peer ** peers;
1085
1086        assert( torrentIsLocked( t ) );
1087
1088        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1089        for( i=0; i<peerCount; ++i )
1090        {
1091            tr_peer * peer = peers[i];
1092            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1093            {
1094                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1095                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1096                           pieceIndex, (int)peer->strikes+1 );
1097                addStrike( t, peer );
1098            }
1099        }
1100    }
1101}
1102
1103int
1104tr_pexCompare( const void * va, const void * vb )
1105{
1106    const tr_pex * a = (const tr_pex *) va;
1107    const tr_pex * b = (const tr_pex *) vb;
1108    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1109    if( i ) return i;
1110    if( a->port < b->port ) return -1;
1111    if( a->port > b->port ) return 1;
1112    return 0;
1113}
1114
1115int tr_pexCompare( const void * a, const void * b );
1116
1117static int
1118peerPrefersCrypto( const tr_peer * peer )
1119{
1120    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1121        return TRUE;
1122
1123    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1124        return FALSE;
1125
1126    return tr_peerIoIsEncrypted( peer->io );
1127};
1128
1129int
1130tr_peerMgrGetPeers( tr_peerMgr      * manager,
1131                    const uint8_t   * torrentHash,
1132                    tr_pex         ** setme_pex )
1133{
1134    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1135    int i, peerCount;
1136    const tr_peer ** peers;
1137    tr_pex * pex;
1138    tr_pex * walk;
1139
1140    torrentLock( (Torrent*)t );
1141
1142    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1143    pex = walk = tr_new( tr_pex, peerCount );
1144
1145    for( i=0; i<peerCount; ++i, ++walk )
1146    {
1147        const tr_peer * peer = peers[i];
1148
1149        walk->in_addr = peer->in_addr;
1150
1151        walk->port = peer->port;
1152
1153        walk->flags = 0;
1154        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1155        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1156    }
1157
1158    assert( ( walk - pex ) == peerCount );
1159    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1160    *setme_pex = pex;
1161
1162    torrentUnlock( (Torrent*)t );
1163
1164    return peerCount;
1165}
1166
1167static int reconnectPulse( void * vtorrent );
1168static int rechokePulse( void * vtorrent );
1169
1170void
1171tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1172                        const uint8_t  * torrentHash )
1173{
1174    Torrent * t;
1175
1176    managerLock( manager );
1177
1178    t = getExistingTorrent( manager, torrentHash );
1179
1180    assert( t != NULL );
1181    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1182    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1183
1184    if( !t->isRunning )
1185    {
1186        t->isRunning = 1;
1187
1188        t->reconnectTimer = tr_timerNew( t->manager->handle,
1189                                         reconnectPulse, t,
1190                                         RECONNECT_PERIOD_MSEC );
1191
1192        t->rechokeTimer = tr_timerNew( t->manager->handle,
1193                                       rechokePulse, t,
1194                                       RECHOKE_PERIOD_MSEC );
1195
1196        reconnectPulse( t );
1197
1198        rechokePulse( t );
1199    }
1200
1201    managerUnlock( manager );
1202}
1203
1204static void
1205stopTorrent( Torrent * t )
1206{
1207    assert( torrentIsLocked( t ) );
1208
1209    t->isRunning = 0;
1210    tr_timerFree( &t->rechokeTimer );
1211    tr_timerFree( &t->reconnectTimer );
1212
1213    /* disconnect the peers. */
1214    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1215    tr_ptrArrayClear( t->peers );
1216
1217    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1218     * which removes the handshake from t->outgoingHandshakes... */
1219    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1220        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1221}
1222void
1223tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1224                       const uint8_t  * torrentHash)
1225{
1226    managerLock( manager );
1227
1228    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1229
1230    managerUnlock( manager );
1231}
1232
1233void
1234tr_peerMgrAddTorrent( tr_peerMgr * manager,
1235                      tr_torrent * tor )
1236{
1237    Torrent * t;
1238
1239    managerLock( manager );
1240
1241    assert( tor != NULL );
1242    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1243
1244    t = torrentConstructor( manager, tor );
1245    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1246
1247    managerUnlock( manager );
1248}
1249
1250void
1251tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1252                         const uint8_t  * torrentHash )
1253{
1254    Torrent * t;
1255
1256    managerLock( manager );
1257
1258    t = getExistingTorrent( manager, torrentHash );
1259    assert( t != NULL );
1260    stopTorrent( t );
1261    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1262    torrentDestructor( t );
1263
1264    managerUnlock( manager );
1265}
1266
1267void
1268tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1269                               const uint8_t    * torrentHash,
1270                               int8_t           * tab,
1271                               int                tabCount )
1272{
1273    int i;
1274    const Torrent * t;
1275    const tr_torrent * tor;
1276    float interval;
1277
1278    managerLock( (tr_peerMgr*)manager );
1279
1280    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1281    tor = t->tor;
1282    interval = tor->info.pieceCount / (float)tabCount;
1283
1284    memset( tab, 0, tabCount );
1285
1286    for( i=0; i<tabCount; ++i )
1287    {
1288        const int piece = i * interval;
1289
1290        if( tor == NULL )
1291            tab[i] = 0;
1292        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1293            tab[i] = -1;
1294        else {
1295            int j, peerCount;
1296            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1297            for( j=0; j<peerCount; ++j )
1298                if( tr_bitfieldHas( peers[j]->have, i ) )
1299                    ++tab[i];
1300        }
1301    }
1302
1303    managerUnlock( (tr_peerMgr*)manager );
1304}
1305
1306/* Returns the pieces that we and/or a connected peer has */
1307tr_bitfield*
1308tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1309                        const uint8_t    * torrentHash )
1310{
1311    int i, size;
1312    const Torrent * t;
1313    const tr_peer ** peers;
1314    tr_bitfield * pieces;
1315
1316    managerLock( (tr_peerMgr*)manager );
1317
1318    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1319    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1320    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1321    for( i=0; i<size; ++i )
1322        if( peers[i]->io != NULL )
1323            tr_bitfieldOr( pieces, peers[i]->have );
1324
1325    managerUnlock( (tr_peerMgr*)manager );
1326    return pieces;
1327}
1328
1329int
1330tr_peerMgrHasConnections( const tr_peerMgr * manager,
1331                          const uint8_t    * torrentHash )
1332{
1333    int ret;
1334    const Torrent * t;
1335    managerLock( (tr_peerMgr*)manager );
1336
1337    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1338    ret = t && tr_ptrArraySize( t->peers );
1339
1340    managerUnlock( (tr_peerMgr*)manager );
1341    return ret;
1342}
1343
1344void
1345tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1346                        const uint8_t    * torrentHash,
1347                        int              * setmePeersKnown,
1348                        int              * setmePeersConnected,
1349                        int              * setmePeersSendingToUs,
1350                        int              * setmePeersGettingFromUs,
1351                        int              * setmePeersFrom )
1352{
1353    int i, size;
1354    const Torrent * t;
1355    const tr_peer ** peers;
1356
1357    managerLock( (tr_peerMgr*)manager );
1358
1359    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1360    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1361
1362    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1363    *setmePeersConnected      = 0;
1364    *setmePeersSendingToUs    = 0;
1365    *setmePeersGettingFromUs  = 0;
1366
1367    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1368        setmePeersFrom[i] = 0;
1369
1370    for( i=0; i<size; ++i )
1371    {
1372        const tr_peer * peer = peers[i];
1373        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1374
1375        if( peer->io == NULL ) /* not connected */
1376            continue;
1377
1378        ++*setmePeersConnected;
1379
1380        ++setmePeersFrom[atom->from];
1381
1382        if( peer->rateToPeer > 0.01 )
1383            ++*setmePeersGettingFromUs;
1384
1385        if( peer->rateToClient > 0.01 )
1386            ++*setmePeersSendingToUs;
1387    }
1388
1389    managerUnlock( (tr_peerMgr*)manager );
1390}
1391
1392struct tr_peer_stat *
1393tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1394                     const uint8_t     * torrentHash,
1395                     int               * setmeCount UNUSED )
1396{
1397    int i, size;
1398    const Torrent * t;
1399    tr_peer ** peers;
1400    tr_peer_stat * ret;
1401
1402    assert( manager != NULL );
1403    managerLock( (tr_peerMgr*)manager );
1404
1405    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1406    peers = getConnectedPeers( (Torrent*)t, &size );
1407    ret = tr_new0( tr_peer_stat, size );
1408
1409    for( i=0; i<size; ++i )
1410    {
1411        const tr_peer * peer = peers[i];
1412        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1413        tr_peer_stat * stat = ret + i;
1414
1415        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1416        stat->port             = peer->port;
1417        stat->from             = atom->from;
1418        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1419        stat->progress         = peer->progress;
1420        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1421        stat->uploadToRate     = peer->rateToPeer;
1422        stat->downloadFromRate = peer->rateToClient;
1423        stat->isDownloading    = stat->uploadToRate > 0.01;
1424        stat->isUploading      = stat->downloadFromRate > 0.01;
1425    }
1426
1427    *setmeCount = size;
1428    tr_free( peers );
1429
1430    managerUnlock( (tr_peerMgr*)manager );
1431    return ret;
1432}
1433
1434/**
1435***
1436**/
1437
1438struct ChokeData
1439{
1440    tr_peer * peer;
1441    float rate;
1442    int randomKey;
1443    int preferred;
1444    int doUnchoke;
1445};
1446
1447static int
1448compareChoke( const void * va, const void * vb )
1449{
1450    const struct ChokeData * a = va;
1451    const struct ChokeData * b = vb;
1452
1453    if( a->preferred != b->preferred )
1454        return a->preferred ? -1 : 1;
1455
1456    if( a->preferred )
1457    {
1458        if( a->rate > b->rate ) return -1;
1459        if( a->rate < b->rate ) return 1;
1460        return 0;
1461    }
1462    else
1463    {
1464        return a->randomKey - b->randomKey;
1465    }
1466}
1467
1468static int
1469clientIsSnubbedBy( const tr_peer * peer )
1470{
1471    assert( peer != NULL );
1472
1473    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1474}
1475
1476/**
1477***
1478**/
1479
1480static double
1481getWeightedThroughput( const tr_peer * peer )
1482{
1483    return ( 3 * peer->rateToPeer )
1484         + ( 1 * peer->rateToClient );
1485}
1486
1487static void
1488rechoke( Torrent * t )
1489{
1490    int i, peerCount, size=0, unchoked=0;
1491    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1492    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1493    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1494
1495    assert( torrentIsLocked( t ) );
1496   
1497    /* sort the peers by preference and rate */
1498    for( i=0; i<peerCount; ++i )
1499    {
1500        tr_peer * peer = peers[i];
1501        struct ChokeData * node;
1502        if( peer->chokeChangedAt > fibrillationTime )
1503            continue;
1504
1505        node = &choke[size++];
1506        node->peer = peer;
1507        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1508        node->randomKey = tr_rand( INT_MAX );
1509        node->rate = getWeightedThroughput( peer );
1510    }
1511
1512    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1513
1514    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1515        choke[i].doUnchoke = 1;
1516        ++unchoked;
1517    }
1518
1519    for( ; i<size; ++i ) {
1520        choke[i].doUnchoke = 1;
1521        ++unchoked;
1522        if( choke[i].peer->peerIsInterested )
1523            break;
1524    }
1525
1526    for( i=0; i<size; ++i )
1527        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1528
1529    /* cleanup */
1530    tr_free( choke );
1531    tr_free( peers );
1532}
1533
1534static int
1535rechokePulse( void * vtorrent )
1536{
1537    Torrent * t = vtorrent;
1538    torrentLock( t );
1539    rechoke( t );
1540    torrentUnlock( t );
1541    return TRUE;
1542}
1543
1544/***
1545****
1546****  Life and Death
1547****
1548***/
1549
1550static int
1551shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1552{
1553    const tr_torrent * tor = t->tor;
1554    const time_t now = time( NULL );
1555    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1556
1557    /* if it's marked for purging, close it */
1558    if( peer->doPurge ) {
1559        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1560        return TRUE;
1561    }
1562
1563    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1564    if( 1 ) {
1565        const int clientIsSeed = tr_torrentIsSeed( tor );
1566        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1567        if( peerIsSeed && clientIsSeed && ( !tr_torrentIsPexEnabled(tor) || (now-atom->time>=30) ) ) {
1568            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1569            return TRUE;
1570        }
1571    }
1572
1573    /* disconnect if it's been too long since piece data has been transferred.
1574     * this is on a sliding scale based on number of available peers... */
1575    if( 1 ) {
1576        const int relaxStrictnessIfFewerThanN = (int)((MAX_CONNECTED_PEERS_PER_TORRENT * 0.9) + 0.5);
1577        /* if we have >= relaxIfFewerThan, strictness is 100%.
1578         * if we have zero connections, strictness is 0% */
1579        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1580            ? 1.0
1581            : peerCount / (double)relaxStrictnessIfFewerThanN;
1582        const int lo = MIN_UPLOAD_IDLE_SECS;
1583        const int hi = MAX_UPLOAD_IDLE_SECS;
1584        const int limit = lo + ((hi-lo) * strictness);
1585        const time_t then = peer->pieceDataActivityDate;
1586        const int idleTime = then ? (now-then) : 0;
1587        if( idleTime > limit ) {
1588            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1589                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1590            return TRUE;
1591        }
1592    }
1593
1594    return FALSE;
1595}
1596
1597static tr_peer **
1598getPeersToClose( Torrent * t, int * setmeSize )
1599{
1600    int i, peerCount, outsize;
1601    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1602    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1603
1604    assert( torrentIsLocked( t ) );
1605
1606    for( i=outsize=0; i<peerCount; ++i )
1607        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1608            ret[outsize++] = peers[i];
1609
1610    *setmeSize = outsize;
1611    return ret;
1612}
1613
1614static int
1615compareCandidates( const void * va, const void * vb )
1616{
1617    const struct peer_atom * a = * (const struct peer_atom**) va;
1618    const struct peer_atom * b = * (const struct peer_atom**) vb;
1619    int i;
1620
1621    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1622        return i;
1623
1624    if( a->time != b->time )
1625        return a->time < b->time ? -1 : 1;
1626
1627    return 0;
1628}
1629
1630static struct peer_atom **
1631getPeerCandidates( Torrent * t, int * setmeSize )
1632{
1633    int i, atomCount, retCount;
1634    struct peer_atom ** atoms;
1635    struct peer_atom ** ret;
1636    const time_t now = time( NULL );
1637    const int seed = tr_torrentIsSeed( t->tor );
1638
1639    assert( torrentIsLocked( t ) );
1640
1641    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1642    ret = tr_new( struct peer_atom*, atomCount );
1643    for( i=retCount=0; i<atomCount; ++i )
1644    {
1645        int wait, minWait, maxWait;
1646        struct peer_atom * atom = atoms[i];
1647
1648        /* peer fed us too much bad data ... we only keep it around
1649         * now to weed it out in case someone sends it to us via pex */
1650        if( atom->myflags & MYFLAG_BANNED ) {
1651            tordbg( t, "RECONNECT peer %d (%s) is banned...",
1652                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1653            continue;
1654        }
1655
1656        /* we don't need two connections to the same peer... */
1657        if( peerIsInUse( t, &atom->addr ) ) {
1658            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1659                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1660            continue;
1661        }
1662
1663        /* no need to connect if we're both seeds... */
1664        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1665            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1666                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1667            continue;
1668        }
1669
1670        /* we're wasting our time trying to connect to this bozo. */
1671        if( atom->numFails > 10 ) {
1672            tordbg( t, "RECONNECT peer %d (%s) gives us nothing but failure.",
1673                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1674            continue;
1675        }
1676
1677        /* if we used this peer recently, give someone else a turn */
1678        minWait = 10; /* ten seconds */
1679        maxWait = (60 * 20); /* twenty minutes */
1680        wait = atom->numFails * 30; /* add 15 secs to the wait interval for each consecutive failure*/
1681        if( wait < minWait ) wait = minWait;
1682        if( wait > maxWait ) wait = maxWait;
1683        if( ( now - atom->time ) < wait ) {
1684            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1685                    i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1686            continue;
1687        }
1688
1689        ret[retCount++] = atom;
1690    }
1691
1692    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1693    *setmeSize = retCount;
1694    return ret;
1695}
1696
1697static int
1698reconnectPulse( void * vtorrent )
1699{
1700    Torrent * t = vtorrent;
1701
1702    torrentLock( t );
1703
1704    if( !t->isRunning )
1705    {
1706        removeAllPeers( t );
1707    }
1708    else
1709    {
1710        int i, nCandidates, nBad, nAdd;
1711        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1712        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1713        const int peerCount = tr_ptrArraySize( t->peers );
1714
1715        if( nBad || nCandidates )
1716            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1717                       "%d connection candidates, %d atoms, max per pulse is %d",
1718                       t->tor->info.name, nBad, nCandidates,
1719                       tr_ptrArraySize(t->pool),
1720                       (int)MAX_RECONNECTIONS_PER_PULSE );
1721
1722        /* disconnect some peers.
1723           if we got transferred piece data, then they might be good peers,
1724           so reset their `numFails' weight to zero.  otherwise we connected
1725           to them fruitlessly, so mark it as another fail */
1726        for( i=0; i<nBad; ++i ) {
1727            tr_peer * peer = connections[i];
1728            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1729            if( peer->pieceDataActivityDate )
1730                atom->numFails = 0;
1731            else
1732                ++atom->numFails;
1733            removePeer( t, peer );
1734        }
1735
1736        /* add some new ones */
1737        nAdd = !peerCount ? MAX_CONNECTED_PEERS_PER_TORRENT
1738                          : MAX_RECONNECTIONS_PER_PULSE;
1739        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1740        {
1741            tr_peerMgr * mgr = t->manager;
1742            struct peer_atom * atom = candidates[i];
1743            tr_peerIo * io;
1744            tr_handshake * handshake;
1745
1746            tordbg( t, "Starting an OUTGOING connection with %s",
1747                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1748
1749            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1750
1751            handshake = tr_handshakeNew( io,
1752                                         mgr->handle->encryptionMode,
1753                                         myHandshakeDoneCB,
1754                                         mgr );
1755
1756            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1757
1758            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1759
1760            atom->time = time( NULL );
1761        }
1762
1763        /* cleanup */
1764        tr_free( connections );
1765        tr_free( candidates );
1766    }
1767
1768    torrentUnlock( t );
1769    return TRUE;
1770}
Note: See TracBrowser for help on using the repository browser.