source: trunk/libtransmission/peer-mgr.c @ 3753

Last change on this file since 3753 was 3753, checked in by charles, 15 years ago

update our #includes now that libevent has cleaned up event.h

  • Property svn:keywords set to Date Rev Author Id
File size: 47.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3753 2007-11-07 18:26:19Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20#include <arpa/inet.h> /* inet_ntoa */
21
22#include <event.h>
23
24#include "transmission.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "platform.h"
35#include "ptrarray.h"
36#include "ratecontrol.h"
37#include "shared.h"
38#include "trevent.h"
39#include "utils.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (1000),
45
46    /* how frequently to decide which peers live and die */
47    RECONNECT_PERIOD_MSEC = (5 * 1000),
48
49    /* how frequently to refill peers' request lists */
50    REFILL_PERIOD_MSEC = 666,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* following the BT spec, we consider ourselves `snubbed' if
56     * we're we don't get piece data from a peer in this long */
57    SNUBBED_SEC = 60,
58
59    /* arbitrary */
60    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
61
62    /* when many peers are available, keep idle ones this long */
63    MIN_UPLOAD_IDLE_SECS = 60,
64
65    /* when few peers are available, keep idle ones this long */
66    MAX_UPLOAD_IDLE_SECS = 240,
67
68    /* how many peers to unchoke per-torrent. */
69    /* FIXME: make this user-configurable? */
70    NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
71
72    /* set this too high and there will be a lot of churn.
73     * set it too low and you'll get peers too slowly */
74    MAX_RECONNECTIONS_PER_PULSE = 10,
75
76    /* corresponds to ut_pex's added.f flags */
77    ADDED_F_ENCRYPTION_FLAG = 1,
78    /* corresponds to ut_pex's added.f flags */
79    ADDED_F_SEED_FLAG = 2,
80
81    /* number of bad pieces a peer is allowed to send before we ban them */
82    MAX_BAD_PIECES_PER_PEER = 3,
83    /* use for bitwise operations w/peer_atom.myflags */
84    MYFLAG_BANNED = 1
85};
86
87
88/**
89***
90**/
91
92/* We keep one of these for every peer we know about, whether
93 * it's connected or not, so the struct must be small.
94 * When our current connections underperform, we dip back
95 * into this list for new ones. */
96struct peer_atom
97{   
98    uint8_t from;
99    uint8_t flags; /* these match the added_f flags */
100    uint8_t myflags; /* flags that aren't defined in added_f */
101    uint16_t port;
102    uint16_t numFails;
103    struct in_addr addr;
104    time_t time;
105};
106
107typedef struct
108{
109    uint8_t hash[SHA_DIGEST_LENGTH];
110    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
111    tr_ptrArray * pool; /* struct peer_atom */
112    tr_ptrArray * peers; /* tr_peer */
113    tr_timer * reconnectTimer;
114    tr_timer * rechokeTimer;
115    tr_timer * refillTimer;
116    tr_torrent * tor;
117    tr_bitfield * requested;
118
119    unsigned int isRunning : 1;
120
121    struct tr_peerMgr * manager;
122}
123Torrent;
124
125struct tr_peerMgr
126{
127    tr_handle * handle;
128    tr_ptrArray * torrents; /* Torrent */
129    tr_ptrArray * incomingHandshakes; /* tr_handshake */
130};
131
132/**
133***
134**/
135
136static void
137myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
138{
139    FILE * fp = tr_getLog( );
140    if( fp != NULL )
141    {
142        va_list args;
143        char timestr[64];
144        struct evbuffer * buf = evbuffer_new( );
145        char * myfile = tr_strdup( file );
146
147        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
148        if( t != NULL )
149            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
150        va_start( args, fmt );
151        evbuffer_add_vprintf( buf, fmt, args );
152        va_end( args );
153        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
154        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
155
156        tr_free( myfile );
157        evbuffer_free( buf );
158    }
159}
160
161#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
162
163/**
164***
165**/
166
167static void
168managerLock( struct tr_peerMgr * manager )
169{
170    tr_globalLock( manager->handle );
171}
172static void
173managerUnlock( struct tr_peerMgr * manager )
174{
175    tr_globalUnlock( manager->handle );
176}
177static void
178torrentLock( Torrent * torrent )
179{
180    managerLock( torrent->manager );
181}
182static void
183torrentUnlock( Torrent * torrent )
184{
185    managerUnlock( torrent->manager );
186}
187static int
188torrentIsLocked( const Torrent * t )
189{
190    return tr_globalIsLocked( t->manager->handle );
191}
192
193/**
194***
195**/
196
197static int
198compareAddresses( const struct in_addr * a, const struct in_addr * b )
199{
200    return tr_compareUint32( a->s_addr, b->s_addr );
201}
202
203static int
204handshakeCompareToAddr( const void * va, const void * vb )
205{
206    const tr_handshake * a = va;
207    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
208}
209
210static int
211handshakeCompare( const void * a, const void * b )
212{
213    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
214}
215
216static tr_handshake*
217getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
218{
219    return tr_ptrArrayFindSorted( handshakes,
220                                  in_addr,
221                                  handshakeCompareToAddr );
222}
223
224static int
225comparePeerAtomToAddress( const void * va, const void * vb )
226{
227    const struct peer_atom * a = va;
228    return compareAddresses( &a->addr, vb );
229}
230
231static int
232comparePeerAtoms( const void * va, const void * vb )
233{
234    const struct peer_atom * b = vb;
235    return comparePeerAtomToAddress( va, &b->addr );
236}
237
238/**
239***
240**/
241
242static int
243torrentCompare( const void * va, const void * vb )
244{
245    const Torrent * a = va;
246    const Torrent * b = vb;
247    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
248}
249
250static int
251torrentCompareToHash( const void * va, const void * vb )
252{
253    const Torrent * a = va;
254    const uint8_t * b_hash = vb;
255    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
256}
257
258static Torrent*
259getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
260{
261    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
262                                             hash,
263                                             torrentCompareToHash );
264}
265
266static int
267peerCompare( const void * va, const void * vb )
268{
269    const tr_peer * a = va;
270    const tr_peer * b = vb;
271    return compareAddresses( &a->in_addr, &b->in_addr );
272}
273
274static int
275peerCompareToAddr( const void * va, const void * vb )
276{
277    const tr_peer * a = va;
278    return compareAddresses( &a->in_addr, vb );
279}
280
281static tr_peer*
282getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
283{
284    assert( torrentIsLocked( torrent ) );
285    assert( in_addr != NULL );
286
287    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
288                                             in_addr,
289                                             peerCompareToAddr );
290}
291
292static struct peer_atom*
293getExistingAtom( const Torrent * t, const struct in_addr * addr )
294{
295    assert( torrentIsLocked( t ) );
296    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
297}
298
299static int
300peerIsInUse( const Torrent * ct, const struct in_addr * addr )
301{
302    Torrent * t = (Torrent*) ct;
303
304    assert( torrentIsLocked ( t ) );
305
306    return getExistingPeer( t, addr )
307        || getExistingHandshake( t->outgoingHandshakes, addr )
308        || getExistingHandshake( t->manager->incomingHandshakes, addr );
309}
310
311static tr_peer*
312peerConstructor( const struct in_addr * in_addr )
313{
314    tr_peer * p;
315    p = tr_new0( tr_peer, 1 );
316    p->rcToClient = tr_rcInit( );
317    p->rcToPeer = tr_rcInit( );
318    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
319    return p;
320}
321
322static tr_peer*
323getPeer( Torrent * torrent, const struct in_addr * in_addr )
324{
325    tr_peer * peer;
326
327    assert( torrentIsLocked( torrent ) );
328
329    peer = getExistingPeer( torrent, in_addr );
330
331    if( peer == NULL ) {
332        peer = peerConstructor( in_addr );
333        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
334    }
335
336    return peer;
337}
338
339static void
340peerDestructor( tr_peer * peer )
341{
342    assert( peer != NULL );
343    assert( peer->msgs != NULL );
344
345    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
346    tr_peerMsgsFree( peer->msgs );
347
348    tr_peerIoFree( peer->io );
349
350    tr_bitfieldFree( peer->have );
351    tr_bitfieldFree( peer->blame );
352    tr_rcClose( peer->rcToClient );
353    tr_rcClose( peer->rcToPeer );
354    tr_free( peer->client );
355    tr_free( peer );
356}
357
358static void
359removePeer( Torrent * t, tr_peer * peer )
360{
361    tr_peer * removed;
362    struct peer_atom * atom;
363
364    assert( torrentIsLocked( t ) );
365
366    atom = getExistingAtom( t, &peer->in_addr );
367    assert( atom != NULL );
368    atom->time = time( NULL );
369
370    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
371    assert( removed == peer );
372    peerDestructor( removed );
373}
374
375static void
376removeAllPeers( Torrent * t )
377{
378    while( !tr_ptrArrayEmpty( t->peers ) )
379        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
380}
381
382static void
383torrentDestructor( Torrent * t )
384{
385    uint8_t hash[SHA_DIGEST_LENGTH];
386
387    assert( t != NULL );
388    assert( !t->isRunning );
389    assert( t->peers != NULL );
390    assert( torrentIsLocked( t ) );
391    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
392    assert( tr_ptrArrayEmpty( t->peers ) );
393
394    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
395
396    tr_timerFree( &t->reconnectTimer );
397    tr_timerFree( &t->rechokeTimer );
398    tr_timerFree( &t->refillTimer );
399
400    tr_bitfieldFree( t->requested );
401    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
402    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
403    tr_ptrArrayFree( t->peers, NULL );
404
405    tr_free( t );
406}
407
408static Torrent*
409torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
410{
411    Torrent * t;
412
413    t = tr_new0( Torrent, 1 );
414    t->manager = manager;
415    t->tor = tor;
416    t->pool = tr_ptrArrayNew( );
417    t->peers = tr_ptrArrayNew( );
418    t->outgoingHandshakes = tr_ptrArrayNew( );
419    t->requested = tr_bitfieldNew( tor->blockCount );
420    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
421
422    return t;
423}
424
425/**
426***
427**/
428
429struct tr_bitfield *
430tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
431                              const uint32_t         pieceCount,
432                              const uint8_t          infohash[20],
433                              const struct in_addr * ip )
434{
435    /* This has been checked against the spec example implementation. Feeding it with :
436    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
437generate :
438    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
439    but since we're storing in a bitfield, it won't be in this order... */
440    /* TODO : We should translate link-local IPv4 adresses to external IP,
441     * so that being on same local network gives us the same allowed pieces */
442   
443    uint8_t *seed;
444    char buf[4];
445    uint32_t allowedPieceCount = 0;
446    tr_bitfield_t * ret;
447
448    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
449    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
450            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
451            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
452            inet_ntoa( *ip ) );
453   
454    seed = malloc(4 + SHA_DIGEST_LENGTH);
455   
456    ret = tr_bitfieldNew( pieceCount );
457   
458    /* We need a seed based on most significant bytes of peer address
459        concatenated with torrent's infohash */
460    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
461   
462    memcpy( seed, &buf, 4 );
463    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
464   
465    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
466   
467    while ( allowedPieceCount < setCount )
468    {
469        int i;
470        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
471        {
472            /* We generate indices from 4-byte chunks of the seed */
473            uint32_t j = i * 4;
474            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
475            uint32_t index = y % pieceCount;
476           
477            if ( !tr_bitfieldHas( ret, index ) )
478            {
479                tr_bitfieldAdd( ret, index );
480                allowedPieceCount++;
481            }
482        }
483        /* We randomize the seed, in case we need to iterate more */
484        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
485    }
486    tr_free( seed );
487   
488    return ret;
489}
490
491tr_peerMgr*
492tr_peerMgrNew( tr_handle * handle )
493{
494    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
495    m->handle = handle;
496    m->torrents = tr_ptrArrayNew( );
497    m->incomingHandshakes = tr_ptrArrayNew( );
498    return m;
499}
500
501void
502tr_peerMgrFree( tr_peerMgr * manager )
503{
504    managerLock( manager );
505
506    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
507     * the item from manager->handshakes, so this is a little roundabout... */
508    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
509        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
510    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
511
512    /* free the torrents. */
513    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
514
515    managerUnlock( manager );
516    tr_free( manager );
517}
518
519static tr_peer**
520getConnectedPeers( Torrent * t, int * setmeCount )
521{
522    int i, peerCount, connectionCount;
523    tr_peer **peers;
524    tr_peer **ret;
525
526    assert( torrentIsLocked( t ) );
527
528    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
529    ret = tr_new( tr_peer*, peerCount );
530
531    for( i=connectionCount=0; i<peerCount; ++i )
532        if( peers[i]->msgs != NULL )
533            ret[connectionCount++] = peers[i];
534
535    *setmeCount = connectionCount;
536    return ret;
537}
538
539/***
540****  Refill
541***/
542
543struct tr_refill_piece
544{
545    tr_priority_t priority;
546    int percentDone;
547    uint16_t random;
548    uint32_t piece;
549    uint32_t peerCount;
550    uint32_t fastAllowed;
551};
552
553static int
554compareRefillPiece (const void * aIn, const void * bIn)
555{
556    const struct tr_refill_piece * a = aIn;
557    const struct tr_refill_piece * b = bIn;
558   
559    /* if one *might be* fastallowed to us, get it first...
560     * I'm putting it on top so we prioritise those pieces at
561     * startup, then we'll have them, and we'll be denied access
562     * to them */
563    if (a->fastAllowed != b->fastAllowed)
564        return a->fastAllowed < b->fastAllowed ? -1 : 1;
565   
566    /* if one piece has a higher priority, it goes first */
567    if (a->priority != b->priority)
568        return a->priority > b->priority ? -1 : 1;
569
570    /* try to fill partial pieces */
571    if( a->percentDone != b->percentDone )
572        return a->percentDone > b->percentDone ? -1 : 1;
573   
574    /* otherwise if one has fewer peers, it goes first */
575    if (a->peerCount != b->peerCount)
576        return a->peerCount < b->peerCount ? -1 : 1;
577
578    /* otherwise go with our random seed */
579    return tr_compareUint16( a->random, b->random );
580}
581
582static int
583isPieceInteresting( const tr_torrent  * tor,
584                    int                 piece )
585{
586    if( tor->info.pieces[piece].dnd ) /* we don't want it */
587        return 0;
588
589    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
590        return 0;
591
592    return 1;
593}
594
595static uint32_t*
596getPreferredPieces( Torrent     * t,
597                    uint32_t    * pieceCount )
598{
599    const tr_torrent * tor = t->tor;
600    const tr_info * inf = &tor->info;
601    int i;
602    uint32_t poolSize = 0;
603    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
604    int peerCount;
605    tr_peer** peers;
606
607    assert( torrentIsLocked( t ) );
608
609    peers = getConnectedPeers( t, &peerCount );
610
611    for( i=0; i<inf->pieceCount; ++i )
612        if( isPieceInteresting( tor, i ) )
613            pool[poolSize++] = i;
614
615    /* sort the pool from most interesting to least... */
616    if( poolSize > 1 )
617    {
618        uint32_t j;
619        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
620
621        for( j=0; j<poolSize; ++j )
622        {
623            int k;
624            const int piece = pool[j];
625            struct tr_refill_piece * setme = p + j;
626
627            setme->piece = piece;
628            setme->priority = inf->pieces[piece].priority;
629            setme->peerCount = 0;
630            setme->fastAllowed = 0;
631            setme->random = tr_rand( UINT16_MAX );
632            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
633
634            for( k=0; k<peerCount; ++k ) {
635                const tr_peer * peer = peers[k];
636                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
637                    ++setme->peerCount;
638                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
639                    but we're guaranteed to get the same pieces from different peers,
640                    so we'll build a list and pray one actually have this one */
641                setme->fastAllowed = tr_peerMsgIsPieceFastAllowed( peer->msgs, i);
642            }
643        }
644
645        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
646
647        for( j=0; j<poolSize; ++j )
648            pool[j] = p[j].piece;
649
650        tr_free( p );
651    }
652
653    tr_free( peers );
654
655    *pieceCount = poolSize;
656    return pool;
657}
658
659static uint64_t*
660getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
661{
662    uint32_t i;
663    uint32_t pieceCount;
664    uint32_t * pieces;
665    uint64_t *req, *unreq, *ret, *walk;
666    int reqCount, unreqCount;
667    const tr_torrent * tor = t->tor;
668
669    assert( torrentIsLocked( t ) );
670
671    pieces = getPreferredPieces( t, &pieceCount );
672
673    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
674    reqCount = 0;
675    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
676    unreqCount = 0;
677
678    for( i=0; i<pieceCount; ++i ) {
679        const uint32_t index = pieces[i];
680        const int begin = tr_torPieceFirstBlock( tor, index );
681        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
682        int block;
683        for( block=begin; block<end; ++block )
684            if( tr_cpBlockIsComplete( tor->completion, block ) )
685                continue;
686            else if( tr_bitfieldHas( t->requested, block ) )
687                req[reqCount++] = block;
688            else
689                unreq[unreqCount++] = block;
690    }
691
692    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
693    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
694    walk += unreqCount;
695    memcpy( walk, req, sizeof(uint64_t) * reqCount );
696    walk += reqCount;
697    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
698    *setmeCount = walk - ret;
699
700    tr_free( req );
701    tr_free( unreq );
702    tr_free( pieces );
703
704    return ret;
705}
706
707static int
708refillPulse( void * vtorrent )
709{
710    Torrent * t = vtorrent;
711    tr_torrent * tor = t->tor;
712    uint32_t i;
713    int peerCount;
714    tr_peer ** peers;
715    uint64_t blockCount;
716    uint64_t * blocks;
717
718    if( !t->isRunning )
719        return TRUE;
720    if( tr_torrentIsSeed( t->tor ) )
721        return TRUE;
722
723    torrentLock( t );
724    tordbg( t, "Refilling Request Buffers..." );
725
726    blocks = getPreferredBlocks( t, &blockCount );
727    peers = getConnectedPeers( t, &peerCount );
728
729    for( i=0; peerCount && i<blockCount; ++i )
730    {
731        const uint64_t block = blocks[i];
732        const uint32_t index = tr_torBlockPiece( tor, block );
733        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
734        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
735        int j;
736        assert( _tr_block( tor, index, begin ) == (int)block );
737        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
738        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
739
740        /* find a peer who can ask for this block */
741        for( j=0; j<peerCount; )
742        {
743            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
744            switch( val )
745            {
746                case TR_ADDREQ_FULL: 
747                case TR_ADDREQ_CLIENT_CHOKED:
748                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
749                    break;
750
751                case TR_ADDREQ_MISSING: 
752                case TR_ADDREQ_DUPLICATE: 
753                    ++j;
754                    break;
755
756                case TR_ADDREQ_OK:
757                    tr_bitfieldAdd( t->requested, block );
758                    j = peerCount;
759                    break;
760
761                default:
762                    assert( 0 && "unhandled value" );
763                    break;
764            }
765        }
766    }
767
768    /* cleanup */
769    tr_free( peers );
770    tr_free( blocks );
771
772    t->refillTimer = NULL;
773    torrentUnlock( t );
774    return FALSE;
775}
776
777static void
778broadcastClientHave( Torrent * t, uint32_t index )
779{
780    int i, size;
781    tr_peer ** peers;
782
783    assert( torrentIsLocked( t ) );
784
785    peers = getConnectedPeers( t, &size );
786    for( i=0; i<size; ++i )
787        tr_peerMsgsHave( peers[i]->msgs, index );
788    tr_free( peers );
789}
790
791static void
792broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
793{
794    int i, size;
795    tr_peer ** peers;
796
797    assert( torrentIsLocked( t ) );
798
799    peers = getConnectedPeers( t, &size );
800    for( i=0; i<size; ++i )
801        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
802    tr_free( peers );
803}
804
805static void
806msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
807{
808    tr_peer * peer = vpeer;
809    Torrent * t = (Torrent *) vt;
810    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
811
812    torrentLock( t );
813
814    switch( e->eventType )
815    {
816        case TR_PEERMSG_NEED_REQ:
817            if( t->refillTimer == NULL )
818                t->refillTimer = tr_timerNew( t->manager->handle,
819                                              refillPulse, t,
820                                              REFILL_PERIOD_MSEC );
821            break;
822
823        case TR_PEERMSG_CANCEL:
824            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
825            break;
826
827        case TR_PEERMSG_CLIENT_HAVE:
828            broadcastClientHave( t, e->pieceIndex );
829            tr_torrentRecheckCompleteness( t->tor );
830            break;
831
832        case TR_PEERMSG_PEER_PROGRESS: {
833            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
834            const int peerIsSeed = e->progress >= 1.0;
835            if( peerIsSeed ) {
836                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
837                atom->flags |= ADDED_F_SEED_FLAG;
838            } else {
839                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
840                atom->flags &= ~ADDED_F_SEED_FLAG;
841            } break;
842        }
843
844        case TR_PEERMSG_CLIENT_BLOCK:
845            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
846            break;
847
848        case TR_PEERMSG_GOT_ERROR:
849            peer->doPurge = 1;
850            break;
851
852        default:
853            assert(0);
854    }
855
856    torrentUnlock( t );
857}
858
859static void
860ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
861{
862    if( getExistingAtom( t, addr ) == NULL )
863    {
864        struct peer_atom * a;
865        a = tr_new0( struct peer_atom, 1 );
866        a->addr = *addr;
867        a->port = port;
868        a->flags = flags;
869        a->from = from;
870        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
871        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
872    }
873}
874
875/* FIXME: this is kind of a mess. */
876static void
877myHandshakeDoneCB( tr_handshake    * handshake,
878                   tr_peerIo       * io,
879                   int               isConnected,
880                   const uint8_t   * peer_id,
881                   void            * vmanager )
882{
883    int ok = isConnected;
884    uint16_t port;
885    const struct in_addr * addr;
886    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
887    Torrent * t;
888    tr_handshake * ours;
889
890    assert( io != NULL );
891    assert( isConnected==0 || isConnected==1 );
892
893    t = tr_peerIoHasTorrentHash( io )
894        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
895        : NULL;
896
897    if( tr_peerIoIsIncoming ( io ) )
898        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
899                                        handshake, handshakeCompare );
900    else if( t != NULL )
901        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
902                                        handshake, handshakeCompare );
903    else
904        ours = handshake;
905
906    assert( ours != NULL );
907    assert( ours == handshake );
908
909    if( t != NULL )
910        torrentLock( t );
911
912    addr = tr_peerIoGetAddress( io, &port );
913
914    if( !ok || !t || !t->isRunning )
915    {
916        tr_peerIoFree( io );
917    }
918    else /* looking good */
919    {
920        struct peer_atom * atom;
921        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
922        atom = getExistingAtom( t, addr );
923
924        if( atom->myflags & MYFLAG_BANNED )
925        {
926            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
927            tr_peerIoFree( io );
928        }
929        else
930        {
931            tr_peer * peer = getExistingPeer( t, addr );
932
933            if( peer != NULL ) /* we already have this peer */
934            {
935                tr_peerIoFree( io );
936            }
937            else
938            {
939                peer = getPeer( t, addr );
940                tr_free( peer->client );
941                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
942                peer->port = port;
943                peer->io = io;
944                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
945                atom->time = time( NULL );
946            }
947        }
948    }
949
950    if( t != NULL )
951        torrentUnlock( t );
952}
953
954void
955tr_peerMgrAddIncoming( tr_peerMgr      * manager,
956                       struct in_addr  * addr,
957                       uint16_t          port,
958                       int               socket )
959{
960    managerLock( manager );
961
962    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
963    {
964        tr_netClose( socket );
965    }
966    else /* we don't have a connetion to them yet... */
967    {
968        tr_peerIo * io;
969        tr_handshake * handshake;
970
971        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
972
973        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
974
975        handshake = tr_handshakeNew( io,
976                                     manager->handle->encryptionMode,
977                                     myHandshakeDoneCB,
978                                     manager );
979
980        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
981    }
982
983    managerUnlock( manager );
984}
985
986void
987tr_peerMgrAddPex( tr_peerMgr     * manager,
988                  const uint8_t  * torrentHash,
989                  uint8_t          from,
990                  const tr_pex   * pex,
991                  int              pexCount )
992{
993    Torrent * t;
994    const tr_pex * end;
995
996    managerLock( manager );
997
998    t = getExistingTorrent( manager, torrentHash );
999    for( end=pex+pexCount; pex!=end; ++pex )
1000        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1001
1002    managerUnlock( manager );
1003}
1004
1005void
1006tr_peerMgrAddPeers( tr_peerMgr    * manager,
1007                    const uint8_t * torrentHash,
1008                    uint8_t         from,
1009                    const uint8_t * peerCompact,
1010                    int             peerCount )
1011{
1012    int i;
1013    const uint8_t * walk = peerCompact;
1014    Torrent * t;
1015
1016    managerLock( manager );
1017
1018    t = getExistingTorrent( manager, torrentHash );
1019    for( i=0; t!=NULL && i<peerCount; ++i )
1020    {
1021        struct in_addr addr;
1022        uint16_t port;
1023        memcpy( &addr, walk, 4 ); walk += 4;
1024        memcpy( &port, walk, 2 ); walk += 2;
1025        ensureAtomExists( t, &addr, port, 0, from );
1026    }
1027
1028    managerUnlock( manager );
1029}
1030
1031/**
1032***
1033**/
1034
1035void
1036tr_peerMgrSetBlame( tr_peerMgr     * manager,
1037                    const uint8_t  * torrentHash,
1038                    int              pieceIndex,
1039                    int              success )
1040{
1041    if( !success )
1042    {
1043        int peerCount, i;
1044        Torrent * t = getExistingTorrent( manager, torrentHash );
1045        tr_peer ** peers;
1046
1047        assert( torrentIsLocked( t ) );
1048
1049        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1050        for( i=0; i<peerCount; ++i )
1051        {
1052            struct peer_atom * atom;
1053            tr_peer * peer;
1054
1055            peer = peers[i];
1056            if( !tr_bitfieldHas( peer->blame, pieceIndex ) )
1057                continue;
1058
1059            ++peer->strikes;
1060            tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1061                       tr_peerIoAddrStr(&peer->in_addr,peer->port),
1062                       pieceIndex, (int)peer->strikes );
1063            if( peer->strikes < MAX_BAD_PIECES_PER_PEER )
1064                continue;
1065
1066            atom = getExistingAtom( t, &peer->in_addr );
1067            atom->myflags |= MYFLAG_BANNED;
1068            peer->doPurge = 1;
1069            tordbg( t, "banning peer %s due to corrupt data", tr_peerIoAddrStr(&atom->addr,atom->port) );
1070        }
1071    }
1072}
1073
1074int
1075tr_pexCompare( const void * va, const void * vb )
1076{
1077    const tr_pex * a = (const tr_pex *) va;
1078    const tr_pex * b = (const tr_pex *) vb;
1079    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1080    if( i ) return i;
1081    if( a->port < b->port ) return -1;
1082    if( a->port > b->port ) return 1;
1083    return 0;
1084}
1085
1086int tr_pexCompare( const void * a, const void * b );
1087
1088static int
1089peerPrefersCrypto( const tr_peer * peer )
1090{
1091    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1092        return TRUE;
1093
1094    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1095        return FALSE;
1096
1097    return tr_peerIoIsEncrypted( peer->io );
1098};
1099
1100int
1101tr_peerMgrGetPeers( tr_peerMgr      * manager,
1102                    const uint8_t   * torrentHash,
1103                    tr_pex         ** setme_pex )
1104{
1105    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1106    int i, peerCount;
1107    const tr_peer ** peers;
1108    tr_pex * pex;
1109    tr_pex * walk;
1110
1111    torrentLock( (Torrent*)t );
1112
1113    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1114    pex = walk = tr_new( tr_pex, peerCount );
1115
1116    for( i=0; i<peerCount; ++i, ++walk )
1117    {
1118        const tr_peer * peer = peers[i];
1119
1120        walk->in_addr = peer->in_addr;
1121
1122        walk->port = peer->port;
1123
1124        walk->flags = 0;
1125        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1126        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1127    }
1128
1129    assert( ( walk - pex ) == peerCount );
1130    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1131    *setme_pex = pex;
1132
1133    torrentUnlock( (Torrent*)t );
1134
1135    return peerCount;
1136}
1137
1138static int reconnectPulse( void * vtorrent );
1139static int rechokePulse( void * vtorrent );
1140
1141void
1142tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1143                        const uint8_t  * torrentHash )
1144{
1145    Torrent * t;
1146
1147    managerLock( manager );
1148
1149    t = getExistingTorrent( manager, torrentHash );
1150
1151    assert( t != NULL );
1152    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1153    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1154
1155    if( !t->isRunning )
1156    {
1157        t->isRunning = 1;
1158
1159        t->reconnectTimer = tr_timerNew( t->manager->handle,
1160                                         reconnectPulse, t,
1161                                         RECONNECT_PERIOD_MSEC );
1162
1163        t->rechokeTimer = tr_timerNew( t->manager->handle,
1164                                       rechokePulse, t,
1165                                       RECHOKE_PERIOD_MSEC );
1166
1167        reconnectPulse( t );
1168
1169        rechokePulse( t );
1170    }
1171
1172    managerUnlock( manager );
1173}
1174
1175static void
1176stopTorrent( Torrent * t )
1177{
1178    assert( torrentIsLocked( t ) );
1179
1180    t->isRunning = 0;
1181    tr_timerFree( &t->rechokeTimer );
1182    tr_timerFree( &t->reconnectTimer );
1183
1184    /* disconnect the peers. */
1185    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1186    tr_ptrArrayClear( t->peers );
1187
1188    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1189     * which removes the handshake from t->outgoingHandshakes... */
1190    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1191        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1192}
1193void
1194tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1195                       const uint8_t  * torrentHash)
1196{
1197    managerLock( manager );
1198
1199    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1200
1201    managerUnlock( manager );
1202}
1203
1204void
1205tr_peerMgrAddTorrent( tr_peerMgr * manager,
1206                      tr_torrent * tor )
1207{
1208    Torrent * t;
1209
1210    managerLock( manager );
1211
1212    assert( tor != NULL );
1213    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1214
1215    t = torrentConstructor( manager, tor );
1216    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1217
1218    managerUnlock( manager );
1219}
1220
1221void
1222tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1223                         const uint8_t  * torrentHash )
1224{
1225    Torrent * t;
1226
1227    managerLock( manager );
1228
1229    t = getExistingTorrent( manager, torrentHash );
1230    assert( t != NULL );
1231    stopTorrent( t );
1232    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1233    torrentDestructor( t );
1234
1235    managerUnlock( manager );
1236}
1237
1238void
1239tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1240                               const uint8_t    * torrentHash,
1241                               int8_t           * tab,
1242                               int                tabCount )
1243{
1244    int i;
1245    const Torrent * t;
1246    const tr_torrent * tor;
1247    float interval;
1248
1249    managerLock( (tr_peerMgr*)manager );
1250
1251    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1252    tor = t->tor;
1253    interval = tor->info.pieceCount / (float)tabCount;
1254
1255    memset( tab, 0, tabCount );
1256
1257    for( i=0; i<tabCount; ++i )
1258    {
1259        const int piece = i * interval;
1260
1261        if( tor == NULL )
1262            tab[i] = 0;
1263        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1264            tab[i] = -1;
1265        else {
1266            int j, peerCount;
1267            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1268            for( j=0; j<peerCount; ++j )
1269                if( tr_bitfieldHas( peers[j]->have, i ) )
1270                    ++tab[i];
1271        }
1272    }
1273
1274    managerUnlock( (tr_peerMgr*)manager );
1275}
1276
1277/* Returns the pieces that we and/or a connected peer has */
1278tr_bitfield*
1279tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1280                        const uint8_t    * torrentHash )
1281{
1282    int i, size;
1283    const Torrent * t;
1284    const tr_peer ** peers;
1285    tr_bitfield * pieces;
1286
1287    managerLock( (tr_peerMgr*)manager );
1288
1289    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1290    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1291    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1292    for( i=0; i<size; ++i )
1293        if( peers[i]->io != NULL )
1294            tr_bitfieldAnd( pieces, peers[i]->have );
1295
1296    managerUnlock( (tr_peerMgr*)manager );
1297    return pieces;
1298}
1299
1300int
1301tr_peerMgrHasConnections( const tr_peerMgr * manager,
1302                          const uint8_t    * torrentHash )
1303{
1304    int ret;
1305    const Torrent * t;
1306    managerLock( (tr_peerMgr*)manager );
1307
1308    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1309    ret = t && tr_ptrArraySize( t->peers );
1310
1311    managerUnlock( (tr_peerMgr*)manager );
1312    return ret;
1313}
1314
1315void
1316tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1317                        const uint8_t    * torrentHash,
1318                        int              * setmePeersKnown,
1319                        int              * setmePeersConnected,
1320                        int              * setmePeersSendingToUs,
1321                        int              * setmePeersGettingFromUs,
1322                        int              * setmePeersFrom )
1323{
1324    int i, size;
1325    const Torrent * t;
1326    const tr_peer ** peers;
1327
1328    managerLock( (tr_peerMgr*)manager );
1329
1330    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1331    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1332
1333    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1334    *setmePeersConnected      = 0;
1335    *setmePeersSendingToUs    = 0;
1336    *setmePeersGettingFromUs  = 0;
1337
1338    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1339        setmePeersFrom[i] = 0;
1340
1341    for( i=0; i<size; ++i )
1342    {
1343        const tr_peer * peer = peers[i];
1344        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1345
1346        if( peer->io == NULL ) /* not connected */
1347            continue;
1348
1349        ++*setmePeersConnected;
1350
1351        ++setmePeersFrom[atom->from];
1352
1353        if( peer->rateToPeer > 0.01 )
1354            ++*setmePeersGettingFromUs;
1355
1356        if( peer->rateToClient > 0.01 )
1357            ++*setmePeersSendingToUs;
1358    }
1359
1360    managerUnlock( (tr_peerMgr*)manager );
1361}
1362
1363struct tr_peer_stat *
1364tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1365                     const uint8_t     * torrentHash,
1366                     int               * setmeCount UNUSED )
1367{
1368    int i, size;
1369    const Torrent * t;
1370    tr_peer ** peers;
1371    tr_peer_stat * ret;
1372
1373    assert( manager != NULL );
1374    managerLock( (tr_peerMgr*)manager );
1375
1376    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1377    peers = getConnectedPeers( (Torrent*)t, &size );
1378    ret = tr_new0( tr_peer_stat, size );
1379
1380    for( i=0; i<size; ++i )
1381    {
1382        const tr_peer * peer = peers[i];
1383        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1384        tr_peer_stat * stat = ret + i;
1385
1386        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1387        stat->port             = peer->port;
1388        stat->from             = atom->from;
1389        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1390        stat->progress         = peer->progress;
1391        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1392        stat->uploadToRate     = peer->rateToPeer;
1393        stat->downloadFromRate = peer->rateToClient;
1394        stat->isDownloading    = stat->uploadToRate > 0.01;
1395        stat->isUploading      = stat->downloadFromRate > 0.01;
1396    }
1397
1398    *setmeCount = size;
1399    tr_free( peers );
1400
1401    managerUnlock( (tr_peerMgr*)manager );
1402    return ret;
1403}
1404
1405/**
1406***
1407**/
1408
1409struct ChokeData
1410{
1411    tr_peer * peer;
1412    float rate;
1413    int randomKey;
1414    int preferred;
1415    int doUnchoke;
1416};
1417
1418static int
1419compareChoke( const void * va, const void * vb )
1420{
1421    const struct ChokeData * a = va;
1422    const struct ChokeData * b = vb;
1423
1424    if( a->preferred != b->preferred )
1425        return a->preferred ? -1 : 1;
1426
1427    if( a->preferred )
1428    {
1429        if( a->rate > b->rate ) return -1;
1430        if( a->rate < b->rate ) return 1;
1431        return 0;
1432    }
1433    else
1434    {
1435        return a->randomKey - b->randomKey;
1436    }
1437}
1438
1439static int
1440clientIsSnubbedBy( const tr_peer * peer )
1441{
1442    assert( peer != NULL );
1443
1444    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1445}
1446
1447/**
1448***
1449**/
1450
1451static double
1452getWeightedThroughput( const tr_peer * peer )
1453{
1454    return ( 3 * peer->rateToPeer )
1455         + ( 1 * peer->rateToClient );
1456}
1457
1458static void
1459rechoke( Torrent * t )
1460{
1461    int i, peerCount, size=0, unchoked=0;
1462    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1463    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1464    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1465
1466    assert( torrentIsLocked( t ) );
1467   
1468    /* sort the peers by preference and rate */
1469    for( i=0; i<peerCount; ++i )
1470    {
1471        tr_peer * peer = peers[i];
1472        struct ChokeData * node;
1473        if( peer->chokeChangedAt > fibrillationTime )
1474            continue;
1475
1476        node = &choke[size++];
1477        node->peer = peer;
1478        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1479        node->randomKey = tr_rand( INT_MAX );
1480        node->rate = getWeightedThroughput( peer );
1481    }
1482
1483    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1484
1485    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1486        choke[i].doUnchoke = 1;
1487        ++unchoked;
1488    }
1489
1490    for( ; i<size; ++i ) {
1491        choke[i].doUnchoke = 1;
1492        ++unchoked;
1493        if( choke[i].peer->peerIsInterested )
1494            break;
1495    }
1496
1497    for( i=0; i<size; ++i )
1498        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1499
1500    /* cleanup */
1501    tr_free( choke );
1502    tr_free( peers );
1503}
1504
1505static int
1506rechokePulse( void * vtorrent )
1507{
1508    Torrent * t = vtorrent;
1509    torrentLock( t );
1510    rechoke( t );
1511    torrentUnlock( t );
1512    return TRUE;
1513}
1514
1515/***
1516****
1517****  Life and Death
1518****
1519***/
1520
1521static int
1522shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1523{
1524    const tr_torrent * tor = t->tor;
1525    const time_t now = time( NULL );
1526    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1527
1528    /* if it's marked for purging, close it */
1529    if( peer->doPurge ) {
1530        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1531        return TRUE;
1532    }
1533
1534    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1535    if( 1 ) {
1536        const int clientIsSeed = tr_torrentIsSeed( tor );
1537        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1538        if( peerIsSeed && clientIsSeed && ( !tr_torrentIsPexEnabled(tor) || (now-atom->time>=30) ) ) {
1539            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1540            return TRUE;
1541        }
1542    }
1543
1544    /* disconnect if it's been too long since piece data has been transferred.
1545     * this is on a sliding scale based on number of available peers... */
1546    if( 1 ) {
1547        const int relaxStrictnessIfFewerThanN = (int)((MAX_CONNECTED_PEERS_PER_TORRENT * 0.9) + 0.5);
1548        /* if we have >= relaxIfFewerThan, strictness is 100%.
1549         * if we have zero connections, strictness is 0% */
1550        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1551            ? 1.0
1552            : peerCount / (double)relaxStrictnessIfFewerThanN;
1553        const int lo = MIN_UPLOAD_IDLE_SECS;
1554        const int hi = MAX_UPLOAD_IDLE_SECS;
1555        const int limit = lo + ((hi-lo) * strictness);
1556        const time_t then = peer->pieceDataActivityDate;
1557        const int idleTime = then ? (now-then) : 0;
1558        if( idleTime > limit ) {
1559            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1560                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1561            return TRUE;
1562        }
1563    }
1564
1565    return FALSE;
1566}
1567
1568static tr_peer **
1569getPeersToClose( Torrent * t, int * setmeSize )
1570{
1571    int i, peerCount, outsize;
1572    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1573    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1574
1575    assert( torrentIsLocked( t ) );
1576
1577    for( i=outsize=0; i<peerCount; ++i )
1578        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1579            ret[outsize++] = peers[i];
1580
1581    *setmeSize = outsize;
1582    return ret;
1583}
1584
1585static int
1586compareAtomByTime( const void * va, const void * vb )
1587{
1588    const struct peer_atom * a = * (const struct peer_atom**) va;
1589    const struct peer_atom * b = * (const struct peer_atom**) vb;
1590    if( a->time < b->time ) return -1;
1591    if( a->time > b->time ) return 1;
1592    return 0;
1593}
1594
1595static struct peer_atom **
1596getPeerCandidates( Torrent * t, int * setmeSize )
1597{
1598    int i, atomCount, retCount;
1599    struct peer_atom ** atoms;
1600    struct peer_atom ** ret;
1601    const time_t now = time( NULL );
1602    const int seed = tr_torrentIsSeed( t->tor );
1603
1604    assert( torrentIsLocked( t ) );
1605
1606    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1607    ret = tr_new( struct peer_atom*, atomCount );
1608    for( i=retCount=0; i<atomCount; ++i )
1609    {
1610        int wait, minWait, maxWait;
1611        struct peer_atom * atom = atoms[i];
1612
1613        /* peer fed us too much bad data ... we only keep it around
1614         * now to weed it out in case someone sends it to us via pex */
1615        if( atom->myflags & MYFLAG_BANNED ) {
1616            tordbg( t, "RECONNECT peer %d (%s) is banned...",
1617                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1618            continue;
1619        }
1620
1621        /* we don't need two connections to the same peer... */
1622        if( peerIsInUse( t, &atom->addr ) ) {
1623            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1624                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1625            continue;
1626        }
1627
1628        /* no need to connect if we're both seeds... */
1629        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1630            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1631                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1632            continue;
1633        }
1634
1635        /* if we used this peer recently, give someone else a turn */
1636        minWait = 60; /* one minute */
1637        maxWait = (60 * 10); /* ten minutes */
1638        wait = atom->numFails * 15; /* add 15 secs to the wait interval for each consecutive failure*/
1639        if( wait < minWait ) wait = minWait;
1640        if( wait > maxWait ) wait = maxWait;
1641        if( ( now - atom->time ) < wait ) {
1642            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1643                    i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1644            continue;
1645        }
1646
1647        ret[retCount++] = atom;
1648    }
1649
1650    qsort( ret, retCount, sizeof(struct peer_atom*), compareAtomByTime );
1651    *setmeSize = retCount;
1652    return ret;
1653}
1654
1655static int
1656reconnectPulse( void * vtorrent )
1657{
1658    Torrent * t = vtorrent;
1659
1660    torrentLock( t );
1661
1662    if( !t->isRunning )
1663    {
1664        removeAllPeers( t );
1665    }
1666    else
1667    {
1668        int i, nCandidates, nBad, nAdd;
1669        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1670        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1671        const int peerCount = tr_ptrArraySize( t->peers );
1672
1673        if( nBad || nCandidates )
1674            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1675                       "%d connection candidates, %d atoms, max per pulse is %d",
1676                       t->tor->info.name, nBad, nCandidates,
1677                       tr_ptrArraySize(t->pool),
1678                       (int)MAX_RECONNECTIONS_PER_PULSE );
1679
1680        /* disconnect some peers.
1681           if we got transferred piece data, then they might be good peers,
1682           so reset their `numFails' weight to zero.  otherwise we connected
1683           to them fruitlessly, so mark it as another fail */
1684        for( i=0; i<nBad; ++i ) {
1685            tr_peer * peer = connections[i];
1686            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1687            if( peer->pieceDataActivityDate )
1688                atom->numFails = 0;
1689            else
1690                ++atom->numFails;
1691            removePeer( t, peer );
1692        }
1693
1694        /* add some new ones */
1695        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1696        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1697        {
1698            tr_peerMgr * mgr = t->manager;
1699            struct peer_atom * atom = candidates[i];
1700            tr_peerIo * io;
1701            tr_handshake * handshake;
1702
1703            tordbg( t, "Starting an OUTGOING connection with %s",
1704                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1705
1706            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1707
1708            handshake = tr_handshakeNew( io,
1709                                         mgr->handle->encryptionMode,
1710                                         myHandshakeDoneCB,
1711                                         mgr );
1712
1713            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1714
1715            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1716
1717            atom->time = time( NULL );
1718        }
1719
1720        /* cleanup */
1721        tr_free( connections );
1722        tr_free( candidates );
1723    }
1724
1725    torrentUnlock( t );
1726    return TRUE;
1727}
Note: See TracBrowser for help on using the repository browser.