source: trunk/libtransmission/peer-mgr.c @ 3861

Last change on this file since 3861 was 3861, checked in by charles, 15 years ago

Add "Status" column to tr_peer_stat.

  • Property svn:keywords set to Date Rev Author Id
File size: 49.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3861 2007-11-17 23:43:33Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <libgen.h> /* basename */
20#include <arpa/inet.h> /* inet_ntoa */
21
22#include <event.h>
23
24#include "transmission.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto.h"
28#include "handshake.h"
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "platform.h"
35#include "ptrarray.h"
36#include "ratecontrol.h"
37#include "shared.h"
38#include "trcompat.h" /* strlcpy */
39#include "trevent.h"
40#include "utils.h"
41
42enum
43{
44    /* how frequently to change which peers are choked */
45    RECHOKE_PERIOD_MSEC = (1000),
46
47    /* how frequently to decide which peers live and die */
48    RECONNECT_PERIOD_MSEC = (5 * 1000),
49
50    /* how frequently to refill peers' request lists */
51    REFILL_PERIOD_MSEC = 666,
52
53    /* don't change a peer's choke status more often than this */
54    MIN_CHOKE_PERIOD_SEC = 10,
55
56    /* following the BT spec, we consider ourselves `snubbed' if
57     * we're we don't get piece data from a peer in this long */
58    SNUBBED_SEC = 60,
59
60    /* arbitrary */
61    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = (60 * 3),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = (60 * 10),
68
69    /* how many peers to unchoke per-torrent. */
70    /* FIXME: make this user-configurable? */
71    NUM_UNCHOKED_PEERS_PER_TORRENT = 20, /* arbitrary */
72
73    /* set this too high and there will be a lot of churn.
74     * set it too low and you'll get peers too slowly */
75    MAX_RECONNECTIONS_PER_PULSE = 5,
76
77    /* corresponds to ut_pex's added.f flags */
78    ADDED_F_ENCRYPTION_FLAG = 1,
79    /* corresponds to ut_pex's added.f flags */
80    ADDED_F_SEED_FLAG = 2,
81
82    /* number of bad pieces a peer is allowed to send before we ban them */
83    MAX_BAD_PIECES_PER_PEER = 3,
84    /* use for bitwise operations w/peer_atom.myflags */
85    MYFLAG_BANNED = 1
86};
87
88
89/**
90***
91**/
92
93/* We keep one of these for every peer we know about, whether
94 * it's connected or not, so the struct must be small.
95 * When our current connections underperform, we dip back
96 * into this list for new ones. */
97struct peer_atom
98{   
99    uint8_t from;
100    uint8_t flags; /* these match the added_f flags */
101    uint8_t myflags; /* flags that aren't defined in added_f */
102    uint16_t port;
103    uint16_t numFails;
104    struct in_addr addr;
105    time_t time;
106};
107
108typedef struct
109{
110    uint8_t hash[SHA_DIGEST_LENGTH];
111    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
112    tr_ptrArray * pool; /* struct peer_atom */
113    tr_ptrArray * peers; /* tr_peer */
114    tr_timer * reconnectTimer;
115    tr_timer * rechokeTimer;
116    tr_timer * refillTimer;
117    tr_torrent * tor;
118    tr_bitfield * requested;
119
120    unsigned int isRunning : 1;
121
122    struct tr_peerMgr * manager;
123}
124Torrent;
125
126struct tr_peerMgr
127{
128    tr_handle * handle;
129    tr_ptrArray * torrents; /* Torrent */
130    tr_ptrArray * incomingHandshakes; /* tr_handshake */
131};
132
133/**
134***
135**/
136
137static void
138myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
139{
140    FILE * fp = tr_getLog( );
141    if( fp != NULL )
142    {
143        va_list args;
144        char timestr[64];
145        struct evbuffer * buf = evbuffer_new( );
146        char * myfile = tr_strdup( file );
147
148        evbuffer_add_printf( buf, "[%s] ", tr_getLogTimeStr( timestr, sizeof(timestr) ) );
149        if( t != NULL )
150            evbuffer_add_printf( buf, "%s ", t->tor->info.name );
151        va_start( args, fmt );
152        evbuffer_add_vprintf( buf, fmt, args );
153        va_end( args );
154        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
155        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
156
157        tr_free( myfile );
158        evbuffer_free( buf );
159    }
160}
161
162#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
163
164/**
165***
166**/
167
168static void
169managerLock( struct tr_peerMgr * manager )
170{
171    tr_globalLock( manager->handle );
172}
173static void
174managerUnlock( struct tr_peerMgr * manager )
175{
176    tr_globalUnlock( manager->handle );
177}
178static void
179torrentLock( Torrent * torrent )
180{
181    managerLock( torrent->manager );
182}
183static void
184torrentUnlock( Torrent * torrent )
185{
186    managerUnlock( torrent->manager );
187}
188static int
189torrentIsLocked( const Torrent * t )
190{
191    return tr_globalIsLocked( t->manager->handle );
192}
193
194/**
195***
196**/
197
198static int
199compareAddresses( const struct in_addr * a, const struct in_addr * b )
200{
201    return tr_compareUint32( a->s_addr, b->s_addr );
202}
203
204static int
205handshakeCompareToAddr( const void * va, const void * vb )
206{
207    const tr_handshake * a = va;
208    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
209}
210
211static int
212handshakeCompare( const void * a, const void * b )
213{
214    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
215}
216
217static tr_handshake*
218getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
219{
220    return tr_ptrArrayFindSorted( handshakes,
221                                  in_addr,
222                                  handshakeCompareToAddr );
223}
224
225static int
226comparePeerAtomToAddress( const void * va, const void * vb )
227{
228    const struct peer_atom * a = va;
229    return compareAddresses( &a->addr, vb );
230}
231
232static int
233comparePeerAtoms( const void * va, const void * vb )
234{
235    const struct peer_atom * b = vb;
236    return comparePeerAtomToAddress( va, &b->addr );
237}
238
239/**
240***
241**/
242
243static int
244torrentCompare( const void * va, const void * vb )
245{
246    const Torrent * a = va;
247    const Torrent * b = vb;
248    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
249}
250
251static int
252torrentCompareToHash( const void * va, const void * vb )
253{
254    const Torrent * a = va;
255    const uint8_t * b_hash = vb;
256    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
257}
258
259static Torrent*
260getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
261{
262    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
263                                             hash,
264                                             torrentCompareToHash );
265}
266
267static int
268peerCompare( const void * va, const void * vb )
269{
270    const tr_peer * a = va;
271    const tr_peer * b = vb;
272    return compareAddresses( &a->in_addr, &b->in_addr );
273}
274
275static int
276peerCompareToAddr( const void * va, const void * vb )
277{
278    const tr_peer * a = va;
279    return compareAddresses( &a->in_addr, vb );
280}
281
282static tr_peer*
283getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
284{
285    assert( torrentIsLocked( torrent ) );
286    assert( in_addr != NULL );
287
288    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
289                                             in_addr,
290                                             peerCompareToAddr );
291}
292
293static struct peer_atom*
294getExistingAtom( const Torrent * t, const struct in_addr * addr )
295{
296    assert( torrentIsLocked( t ) );
297    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
298}
299
300static int
301peerIsInUse( const Torrent * ct, const struct in_addr * addr )
302{
303    Torrent * t = (Torrent*) ct;
304
305    assert( torrentIsLocked ( t ) );
306
307    return getExistingPeer( t, addr )
308        || getExistingHandshake( t->outgoingHandshakes, addr )
309        || getExistingHandshake( t->manager->incomingHandshakes, addr );
310}
311
312static tr_peer*
313peerConstructor( const struct in_addr * in_addr )
314{
315    tr_peer * p;
316    p = tr_new0( tr_peer, 1 );
317    p->rcToClient = tr_rcInit( );
318    p->rcToPeer = tr_rcInit( );
319    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
320    return p;
321}
322
323static tr_peer*
324getPeer( Torrent * torrent, const struct in_addr * in_addr )
325{
326    tr_peer * peer;
327
328    assert( torrentIsLocked( torrent ) );
329
330    peer = getExistingPeer( torrent, in_addr );
331
332    if( peer == NULL ) {
333        peer = peerConstructor( in_addr );
334        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
335    }
336
337    return peer;
338}
339
340static void
341peerDestructor( tr_peer * peer )
342{
343    assert( peer != NULL );
344    assert( peer->msgs != NULL );
345
346    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
347    tr_peerMsgsFree( peer->msgs );
348
349    tr_peerIoFree( peer->io );
350
351    tr_bitfieldFree( peer->have );
352    tr_bitfieldFree( peer->blame );
353    tr_rcClose( peer->rcToClient );
354    tr_rcClose( peer->rcToPeer );
355    tr_free( peer->client );
356    tr_free( peer );
357}
358
359static void
360removePeer( Torrent * t, tr_peer * peer )
361{
362    tr_peer * removed;
363    struct peer_atom * atom;
364
365    assert( torrentIsLocked( t ) );
366
367    atom = getExistingAtom( t, &peer->in_addr );
368    assert( atom != NULL );
369    atom->time = time( NULL );
370
371    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
372    assert( removed == peer );
373    peerDestructor( removed );
374}
375
376static void
377removeAllPeers( Torrent * t )
378{
379    while( !tr_ptrArrayEmpty( t->peers ) )
380        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
381}
382
383static void
384torrentDestructor( Torrent * t )
385{
386    uint8_t hash[SHA_DIGEST_LENGTH];
387
388    assert( t != NULL );
389    assert( !t->isRunning );
390    assert( t->peers != NULL );
391    assert( torrentIsLocked( t ) );
392    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
393    assert( tr_ptrArrayEmpty( t->peers ) );
394
395    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
396
397    tr_timerFree( &t->reconnectTimer );
398    tr_timerFree( &t->rechokeTimer );
399    tr_timerFree( &t->refillTimer );
400
401    tr_bitfieldFree( t->requested );
402    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
403    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
404    tr_ptrArrayFree( t->peers, NULL );
405
406    tr_free( t );
407}
408
409static Torrent*
410torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
411{
412    Torrent * t;
413
414    t = tr_new0( Torrent, 1 );
415    t->manager = manager;
416    t->tor = tor;
417    t->pool = tr_ptrArrayNew( );
418    t->peers = tr_ptrArrayNew( );
419    t->outgoingHandshakes = tr_ptrArrayNew( );
420    t->requested = tr_bitfieldNew( tor->blockCount );
421    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
422
423    return t;
424}
425
426/**
427***
428**/
429
430struct tr_bitfield *
431tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
432                              const uint32_t         pieceCount,
433                              const uint8_t          infohash[20],
434                              const struct in_addr * ip )
435{
436    /* This has been checked against the spec example implementation. Feeding it with :
437    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
438generate :
439    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
440    but since we're storing in a bitfield, it won't be in this order... */
441    /* TODO : We should translate link-local IPv4 adresses to external IP,
442     * so that being on same local network gives us the same allowed pieces */
443   
444    uint8_t *seed;
445    char buf[4];
446    uint32_t allowedPieceCount = 0;
447    tr_bitfield_t * ret;
448
449#if 0
450    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
451    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
452            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
453            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
454            inet_ntoa( *ip ) );
455#endif
456   
457    seed = malloc(4 + SHA_DIGEST_LENGTH);
458   
459    ret = tr_bitfieldNew( pieceCount );
460   
461    /* We need a seed based on most significant bytes of peer address
462        concatenated with torrent's infohash */
463    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
464   
465    memcpy( seed, &buf, 4 );
466    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
467   
468    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
469   
470    while ( allowedPieceCount < setCount )
471    {
472        int i;
473        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
474        {
475            /* We generate indices from 4-byte chunks of the seed */
476            uint32_t j = i * 4;
477            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
478            uint32_t index = y % pieceCount;
479           
480            if ( !tr_bitfieldHas( ret, index ) )
481            {
482                tr_bitfieldAdd( ret, index );
483                allowedPieceCount++;
484            }
485        }
486        /* We randomize the seed, in case we need to iterate more */
487        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
488    }
489    tr_free( seed );
490   
491    return ret;
492}
493
494tr_peerMgr*
495tr_peerMgrNew( tr_handle * handle )
496{
497    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
498    m->handle = handle;
499    m->torrents = tr_ptrArrayNew( );
500    m->incomingHandshakes = tr_ptrArrayNew( );
501    return m;
502}
503
504void
505tr_peerMgrFree( tr_peerMgr * manager )
506{
507    managerLock( manager );
508
509    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
510     * the item from manager->handshakes, so this is a little roundabout... */
511    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
512        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
513    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
514
515    /* free the torrents. */
516    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
517
518    managerUnlock( manager );
519    tr_free( manager );
520}
521
522static tr_peer**
523getConnectedPeers( Torrent * t, int * setmeCount )
524{
525    int i, peerCount, connectionCount;
526    tr_peer **peers;
527    tr_peer **ret;
528
529    assert( torrentIsLocked( t ) );
530
531    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
532    ret = tr_new( tr_peer*, peerCount );
533
534    for( i=connectionCount=0; i<peerCount; ++i )
535        if( peers[i]->msgs != NULL )
536            ret[connectionCount++] = peers[i];
537
538    *setmeCount = connectionCount;
539    return ret;
540}
541
542/***
543****  Refill
544***/
545
546struct tr_refill_piece
547{
548    tr_priority_t priority;
549    int percentDone;
550    uint16_t random;
551    uint32_t piece;
552    uint32_t peerCount;
553    uint32_t fastAllowed;
554    uint32_t suggested;
555};
556
557static int
558compareRefillPiece (const void * aIn, const void * bIn)
559{
560    const struct tr_refill_piece * a = aIn;
561    const struct tr_refill_piece * b = bIn;
562   
563    /* if one *might be* fastallowed to us, get it first...
564     * I'm putting it on top so we prioritize those pieces at
565     * startup, then we'll have them, and we'll be denied access
566     * to them */
567    if (a->fastAllowed != b->fastAllowed)
568        return a->fastAllowed < b->fastAllowed ? -1 : 1;
569   
570    /* if one piece has a higher priority, it goes first */
571    if (a->priority != b->priority)
572        return a->priority > b->priority ? -1 : 1;
573   
574    /* otherwise if one was suggested to us, get it */
575    if (a->suggested != b->suggested)
576        return a->suggested < b->suggested ? -1 : 1;
577
578    /* try to fill partial pieces */
579    if( a->percentDone != b->percentDone )
580        return a->percentDone > b->percentDone ? -1 : 1;
581   
582    /* otherwise if one has fewer peers, it goes first */
583    if (a->peerCount != b->peerCount)
584        return a->peerCount < b->peerCount ? -1 : 1;
585
586    /* otherwise go with our random seed */
587    return tr_compareUint16( a->random, b->random );
588}
589
590static int
591isPieceInteresting( const tr_torrent  * tor,
592                    int                 piece )
593{
594    if( tor->info.pieces[piece].dnd ) /* we don't want it */
595        return 0;
596
597    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
598        return 0;
599
600    return 1;
601}
602
603static uint32_t*
604getPreferredPieces( Torrent     * t,
605                    uint32_t    * pieceCount )
606{
607    const tr_torrent * tor = t->tor;
608    const tr_info * inf = &tor->info;
609    int i;
610    uint32_t poolSize = 0;
611    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
612    int peerCount;
613    tr_peer** peers;
614
615    assert( torrentIsLocked( t ) );
616
617    peers = getConnectedPeers( t, &peerCount );
618
619    for( i=0; i<inf->pieceCount; ++i )
620        if( isPieceInteresting( tor, i ) )
621            pool[poolSize++] = i;
622
623    /* sort the pool from most interesting to least... */
624    if( poolSize > 1 )
625    {
626        uint32_t j;
627        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
628
629        for( j=0; j<poolSize; ++j )
630        {
631            int k;
632            const int piece = pool[j];
633            struct tr_refill_piece * setme = p + j;
634
635            setme->piece = piece;
636            setme->priority = inf->pieces[piece].priority;
637            setme->peerCount = 0;
638            setme->fastAllowed = 0;
639            setme->random = tr_rand( UINT16_MAX );
640            setme->percentDone = (int)( 100.0 * tr_cpPercentBlocksInPiece( tor->completion, piece ) );
641
642            for( k=0; k<peerCount; ++k ) {
643                const tr_peer * peer = peers[k];
644                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
645                    ++setme->peerCount;
646                /* The fast peer extension doesn't force a peer to actually HAVE a fast-allowed piece,
647                    but we're guaranteed to get the same pieces from different peers,
648                    so we'll build a list and pray one actually have this one */
649                setme->fastAllowed = tr_peerMsgsIsPieceFastAllowed( peer->msgs, i );
650                /* Also, if someone SUGGESTed a piece to us, prioritize it over non-suggested others
651                 */
652                setme->suggested   = tr_peerMsgsIsPieceSuggested( peer->msgs, i );
653            }
654        }
655
656        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
657
658        for( j=0; j<poolSize; ++j )
659            pool[j] = p[j].piece;
660
661        tr_free( p );
662    }
663
664    tr_free( peers );
665
666    *pieceCount = poolSize;
667    return pool;
668}
669
670static uint64_t*
671getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
672{
673    uint32_t i;
674    uint32_t pieceCount;
675    uint32_t * pieces;
676    uint64_t *req, *unreq, *ret, *walk;
677    int reqCount, unreqCount;
678    const tr_torrent * tor = t->tor;
679
680    assert( torrentIsLocked( t ) );
681
682    pieces = getPreferredPieces( t, &pieceCount );
683
684    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
685    reqCount = 0;
686    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
687    unreqCount = 0;
688
689    for( i=0; i<pieceCount; ++i ) {
690        const uint32_t index = pieces[i];
691        const int begin = tr_torPieceFirstBlock( tor, index );
692        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
693        int block;
694        for( block=begin; block<end; ++block )
695            if( tr_cpBlockIsComplete( tor->completion, block ) )
696                continue;
697            else if( tr_bitfieldHas( t->requested, block ) )
698                req[reqCount++] = block;
699            else
700                unreq[unreqCount++] = block;
701    }
702
703    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
704    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
705    walk += unreqCount;
706    memcpy( walk, req, sizeof(uint64_t) * reqCount );
707    walk += reqCount;
708    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
709    *setmeCount = walk - ret;
710
711    tr_free( req );
712    tr_free( unreq );
713    tr_free( pieces );
714
715    return ret;
716}
717
718static int
719refillPulse( void * vtorrent )
720{
721    Torrent * t = vtorrent;
722    tr_torrent * tor = t->tor;
723    uint32_t i;
724    int peerCount;
725    tr_peer ** peers;
726    uint64_t blockCount;
727    uint64_t * blocks;
728
729    if( !t->isRunning )
730        return TRUE;
731    if( tr_torrentIsSeed( t->tor ) )
732        return TRUE;
733
734    torrentLock( t );
735    tordbg( t, "Refilling Request Buffers..." );
736
737    blocks = getPreferredBlocks( t, &blockCount );
738    peers = getConnectedPeers( t, &peerCount );
739
740    for( i=0; peerCount && i<blockCount; ++i )
741    {
742        const uint64_t block = blocks[i];
743        const uint32_t index = tr_torBlockPiece( tor, block );
744        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
745        const uint32_t length = tr_torBlockCountBytes( tor, (int)block );
746        int j;
747        assert( _tr_block( tor, index, begin ) == (int)block );
748        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
749        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
750
751        /* find a peer who can ask for this block */
752        for( j=0; j<peerCount; )
753        {
754            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
755            switch( val )
756            {
757                case TR_ADDREQ_FULL: 
758                case TR_ADDREQ_CLIENT_CHOKED:
759                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
760                    break;
761
762                case TR_ADDREQ_MISSING: 
763                case TR_ADDREQ_DUPLICATE: 
764                    ++j;
765                    break;
766
767                case TR_ADDREQ_OK:
768                    tr_bitfieldAdd( t->requested, block );
769                    j = peerCount;
770                    break;
771
772                default:
773                    assert( 0 && "unhandled value" );
774                    break;
775            }
776        }
777    }
778
779    /* cleanup */
780    tr_free( peers );
781    tr_free( blocks );
782
783    t->refillTimer = NULL;
784    torrentUnlock( t );
785    return FALSE;
786}
787
788static void
789broadcastClientHave( Torrent * t, uint32_t index )
790{
791    int i, size;
792    tr_peer ** peers;
793
794    assert( torrentIsLocked( t ) );
795
796    peers = getConnectedPeers( t, &size );
797    for( i=0; i<size; ++i )
798        tr_peerMsgsHave( peers[i]->msgs, index );
799    tr_free( peers );
800}
801
802static void
803broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
804{
805    int i, size;
806    tr_peer ** peers;
807
808    assert( torrentIsLocked( t ) );
809
810    peers = getConnectedPeers( t, &size );
811    for( i=0; i<size; ++i )
812        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
813    tr_free( peers );
814}
815
816static void
817addStrike( Torrent * t, tr_peer * peer )
818{
819    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
820
821    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
822    {
823        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
824        atom->myflags |= MYFLAG_BANNED;
825        peer->doPurge = 1;
826        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
827    }
828}
829
830static void
831msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
832{
833    tr_peer * peer = vpeer;
834    Torrent * t = (Torrent *) vt;
835    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
836
837    torrentLock( t );
838
839    switch( e->eventType )
840    {
841        case TR_PEERMSG_NEED_REQ:
842            if( t->refillTimer == NULL )
843                t->refillTimer = tr_timerNew( t->manager->handle,
844                                              refillPulse, t,
845                                              REFILL_PERIOD_MSEC );
846            break;
847
848        case TR_PEERMSG_CANCEL:
849            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
850            break;
851
852        case TR_PEERMSG_CLIENT_HAVE:
853            broadcastClientHave( t, e->pieceIndex );
854            tr_torrentRecheckCompleteness( t->tor );
855            break;
856
857        case TR_PEERMSG_PEER_PROGRESS: {
858            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
859            const int peerIsSeed = e->progress >= 1.0;
860            if( peerIsSeed ) {
861                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
862                atom->flags |= ADDED_F_SEED_FLAG;
863            } else {
864                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
865                atom->flags &= ~ADDED_F_SEED_FLAG;
866            } break;
867        }
868
869        case TR_PEERMSG_CLIENT_BLOCK:
870            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
871            break;
872
873        case TR_PEERMSG_GOT_ASSERT_ERROR:
874            addStrike( t, peer );
875            peer->doPurge = 1;
876            break;
877
878        case TR_PEERMSG_GOT_ERROR:
879            peer->doPurge = 1;
880            break;
881
882        default:
883            assert(0);
884    }
885
886    torrentUnlock( t );
887}
888
889static void
890ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
891{
892    if( getExistingAtom( t, addr ) == NULL )
893    {
894        struct peer_atom * a;
895        a = tr_new0( struct peer_atom, 1 );
896        a->addr = *addr;
897        a->port = port;
898        a->flags = flags;
899        a->from = from;
900        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
901        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
902    }
903}
904
905/* FIXME: this is kind of a mess. */
906static void
907myHandshakeDoneCB( tr_handshake    * handshake,
908                   tr_peerIo       * io,
909                   int               isConnected,
910                   const uint8_t   * peer_id,
911                   void            * vmanager )
912{
913    int ok = isConnected;
914    uint16_t port;
915    const struct in_addr * addr;
916    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
917    Torrent * t;
918    tr_handshake * ours;
919
920    assert( io != NULL );
921    assert( isConnected==0 || isConnected==1 );
922
923    t = tr_peerIoHasTorrentHash( io )
924        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
925        : NULL;
926
927    if( tr_peerIoIsIncoming ( io ) )
928        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
929                                        handshake, handshakeCompare );
930    else if( t != NULL )
931        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
932                                        handshake, handshakeCompare );
933    else
934        ours = handshake;
935
936    assert( ours != NULL );
937    assert( ours == handshake );
938
939    if( t != NULL )
940        torrentLock( t );
941
942    addr = tr_peerIoGetAddress( io, &port );
943
944    if( !ok || !t || !t->isRunning )
945    {
946        if( t ) {
947            struct peer_atom * atom = getExistingAtom( t, addr );
948            if( atom )
949                ++atom->numFails;
950        }
951
952        tr_peerIoFree( io );
953    }
954    else /* looking good */
955    {
956        struct peer_atom * atom;
957        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
958        atom = getExistingAtom( t, addr );
959
960        if( atom->myflags & MYFLAG_BANNED )
961        {
962            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
963            tr_peerIoFree( io );
964        }
965        else if( tr_ptrArraySize( t->peers ) >= MAX_CONNECTED_PEERS_PER_TORRENT )
966        {
967            tr_peerIoFree( io );
968        }
969        else
970        {
971            tr_peer * peer = getExistingPeer( t, addr );
972
973            if( peer != NULL ) /* we already have this peer */
974            {
975                tr_peerIoFree( io );
976            }
977            else
978            {
979                peer = getPeer( t, addr );
980                tr_free( peer->client );
981                peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
982                peer->port = port;
983                peer->io = io;
984                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
985                atom->time = time( NULL );
986            }
987        }
988    }
989
990    if( t != NULL )
991        torrentUnlock( t );
992}
993
994void
995tr_peerMgrAddIncoming( tr_peerMgr      * manager,
996                       struct in_addr  * addr,
997                       uint16_t          port,
998                       int               socket )
999{
1000    managerLock( manager );
1001
1002    if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1003    {
1004        tr_netClose( socket );
1005    }
1006    else /* we don't have a connetion to them yet... */
1007    {
1008        tr_peerIo * io;
1009        tr_handshake * handshake;
1010
1011        tordbg( NULL, "Got an INCOMING connection with %s", tr_peerIoAddrStr( addr, port ) );
1012
1013        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1014
1015        handshake = tr_handshakeNew( io,
1016                                     manager->handle->encryptionMode,
1017                                     myHandshakeDoneCB,
1018                                     manager );
1019
1020        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1021    }
1022
1023    managerUnlock( manager );
1024}
1025
1026void
1027tr_peerMgrAddPex( tr_peerMgr     * manager,
1028                  const uint8_t  * torrentHash,
1029                  uint8_t          from,
1030                  const tr_pex   * pex,
1031                  int              pexCount )
1032{
1033    Torrent * t;
1034    const tr_pex * end;
1035
1036    managerLock( manager );
1037
1038    t = getExistingTorrent( manager, torrentHash );
1039    for( end=pex+pexCount; pex!=end; ++pex )
1040        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1041
1042    managerUnlock( manager );
1043}
1044
1045void
1046tr_peerMgrAddPeers( tr_peerMgr    * manager,
1047                    const uint8_t * torrentHash,
1048                    uint8_t         from,
1049                    const uint8_t * peerCompact,
1050                    int             peerCount )
1051{
1052    int i;
1053    const uint8_t * walk = peerCompact;
1054    Torrent * t;
1055
1056    managerLock( manager );
1057
1058    t = getExistingTorrent( manager, torrentHash );
1059    for( i=0; t!=NULL && i<peerCount; ++i )
1060    {
1061        struct in_addr addr;
1062        uint16_t port;
1063        memcpy( &addr, walk, 4 ); walk += 4;
1064        memcpy( &port, walk, 2 ); walk += 2;
1065        ensureAtomExists( t, &addr, port, 0, from );
1066    }
1067
1068    managerUnlock( manager );
1069}
1070
1071/**
1072***
1073**/
1074
1075void
1076tr_peerMgrSetBlame( tr_peerMgr     * manager,
1077                    const uint8_t  * torrentHash,
1078                    int              pieceIndex,
1079                    int              success )
1080{
1081    if( !success )
1082    {
1083        int peerCount, i;
1084        Torrent * t = getExistingTorrent( manager, torrentHash );
1085        tr_peer ** peers;
1086
1087        assert( torrentIsLocked( t ) );
1088
1089        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1090        for( i=0; i<peerCount; ++i )
1091        {
1092            tr_peer * peer = peers[i];
1093            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1094            {
1095                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1096                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1097                           pieceIndex, (int)peer->strikes+1 );
1098                addStrike( t, peer );
1099            }
1100        }
1101    }
1102}
1103
1104int
1105tr_pexCompare( const void * va, const void * vb )
1106{
1107    const tr_pex * a = (const tr_pex *) va;
1108    const tr_pex * b = (const tr_pex *) vb;
1109    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1110    if( i ) return i;
1111    if( a->port < b->port ) return -1;
1112    if( a->port > b->port ) return 1;
1113    return 0;
1114}
1115
1116int tr_pexCompare( const void * a, const void * b );
1117
1118static int
1119peerPrefersCrypto( const tr_peer * peer )
1120{
1121    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1122        return TRUE;
1123
1124    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1125        return FALSE;
1126
1127    return tr_peerIoIsEncrypted( peer->io );
1128};
1129
1130int
1131tr_peerMgrGetPeers( tr_peerMgr      * manager,
1132                    const uint8_t   * torrentHash,
1133                    tr_pex         ** setme_pex )
1134{
1135    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1136    int i, peerCount;
1137    const tr_peer ** peers;
1138    tr_pex * pex;
1139    tr_pex * walk;
1140
1141    torrentLock( (Torrent*)t );
1142
1143    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1144    pex = walk = tr_new( tr_pex, peerCount );
1145
1146    for( i=0; i<peerCount; ++i, ++walk )
1147    {
1148        const tr_peer * peer = peers[i];
1149
1150        walk->in_addr = peer->in_addr;
1151
1152        walk->port = peer->port;
1153
1154        walk->flags = 0;
1155        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1156        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1157    }
1158
1159    assert( ( walk - pex ) == peerCount );
1160    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1161    *setme_pex = pex;
1162
1163    torrentUnlock( (Torrent*)t );
1164
1165    return peerCount;
1166}
1167
1168static int reconnectPulse( void * vtorrent );
1169static int rechokePulse( void * vtorrent );
1170
1171void
1172tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1173                        const uint8_t  * torrentHash )
1174{
1175    Torrent * t;
1176
1177    managerLock( manager );
1178
1179    t = getExistingTorrent( manager, torrentHash );
1180
1181    assert( t != NULL );
1182    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1183    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1184
1185    if( !t->isRunning )
1186    {
1187        t->isRunning = 1;
1188
1189        t->reconnectTimer = tr_timerNew( t->manager->handle,
1190                                         reconnectPulse, t,
1191                                         RECONNECT_PERIOD_MSEC );
1192
1193        t->rechokeTimer = tr_timerNew( t->manager->handle,
1194                                       rechokePulse, t,
1195                                       RECHOKE_PERIOD_MSEC );
1196
1197        reconnectPulse( t );
1198
1199        rechokePulse( t );
1200    }
1201
1202    managerUnlock( manager );
1203}
1204
1205static void
1206stopTorrent( Torrent * t )
1207{
1208    assert( torrentIsLocked( t ) );
1209
1210    t->isRunning = 0;
1211    tr_timerFree( &t->rechokeTimer );
1212    tr_timerFree( &t->reconnectTimer );
1213
1214    /* disconnect the peers. */
1215    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1216    tr_ptrArrayClear( t->peers );
1217
1218    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1219     * which removes the handshake from t->outgoingHandshakes... */
1220    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1221        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1222}
1223void
1224tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1225                       const uint8_t  * torrentHash)
1226{
1227    managerLock( manager );
1228
1229    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1230
1231    managerUnlock( manager );
1232}
1233
1234void
1235tr_peerMgrAddTorrent( tr_peerMgr * manager,
1236                      tr_torrent * tor )
1237{
1238    Torrent * t;
1239
1240    managerLock( manager );
1241
1242    assert( tor != NULL );
1243    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1244
1245    t = torrentConstructor( manager, tor );
1246    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1247
1248    managerUnlock( manager );
1249}
1250
1251void
1252tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1253                         const uint8_t  * torrentHash )
1254{
1255    Torrent * t;
1256
1257    managerLock( manager );
1258
1259    t = getExistingTorrent( manager, torrentHash );
1260    assert( t != NULL );
1261    stopTorrent( t );
1262    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1263    torrentDestructor( t );
1264
1265    managerUnlock( manager );
1266}
1267
1268void
1269tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1270                               const uint8_t    * torrentHash,
1271                               int8_t           * tab,
1272                               int                tabCount )
1273{
1274    int i;
1275    const Torrent * t;
1276    const tr_torrent * tor;
1277    float interval;
1278
1279    managerLock( (tr_peerMgr*)manager );
1280
1281    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1282    tor = t->tor;
1283    interval = tor->info.pieceCount / (float)tabCount;
1284
1285    memset( tab, 0, tabCount );
1286
1287    for( i=0; i<tabCount; ++i )
1288    {
1289        const int piece = i * interval;
1290
1291        if( tor == NULL )
1292            tab[i] = 0;
1293        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1294            tab[i] = -1;
1295        else {
1296            int j, peerCount;
1297            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1298            for( j=0; j<peerCount; ++j )
1299                if( tr_bitfieldHas( peers[j]->have, i ) )
1300                    ++tab[i];
1301        }
1302    }
1303
1304    managerUnlock( (tr_peerMgr*)manager );
1305}
1306
1307/* Returns the pieces that we and/or a connected peer has */
1308tr_bitfield*
1309tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1310                        const uint8_t    * torrentHash )
1311{
1312    int i, size;
1313    const Torrent * t;
1314    const tr_peer ** peers;
1315    tr_bitfield * pieces;
1316
1317    managerLock( (tr_peerMgr*)manager );
1318
1319    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1320    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1321    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1322    for( i=0; i<size; ++i )
1323        if( peers[i]->io != NULL )
1324            tr_bitfieldOr( pieces, peers[i]->have );
1325
1326    managerUnlock( (tr_peerMgr*)manager );
1327    return pieces;
1328}
1329
1330int
1331tr_peerMgrHasConnections( const tr_peerMgr * manager,
1332                          const uint8_t    * torrentHash )
1333{
1334    int ret;
1335    const Torrent * t;
1336    managerLock( (tr_peerMgr*)manager );
1337
1338    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1339    ret = t && tr_ptrArraySize( t->peers );
1340
1341    managerUnlock( (tr_peerMgr*)manager );
1342    return ret;
1343}
1344
1345void
1346tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1347                        const uint8_t    * torrentHash,
1348                        int              * setmePeersKnown,
1349                        int              * setmePeersConnected,
1350                        int              * setmePeersSendingToUs,
1351                        int              * setmePeersGettingFromUs,
1352                        int              * setmePeersFrom )
1353{
1354    int i, size;
1355    const Torrent * t;
1356    const tr_peer ** peers;
1357
1358    managerLock( (tr_peerMgr*)manager );
1359
1360    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1361    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1362
1363    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1364    *setmePeersConnected      = 0;
1365    *setmePeersSendingToUs    = 0;
1366    *setmePeersGettingFromUs  = 0;
1367
1368    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1369        setmePeersFrom[i] = 0;
1370
1371    for( i=0; i<size; ++i )
1372    {
1373        const tr_peer * peer = peers[i];
1374        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1375
1376        if( peer->io == NULL ) /* not connected */
1377            continue;
1378
1379        ++*setmePeersConnected;
1380
1381        ++setmePeersFrom[atom->from];
1382
1383        if( peer->rateToPeer > 0.01 )
1384            ++*setmePeersGettingFromUs;
1385
1386        if( peer->rateToClient > 0.01 )
1387            ++*setmePeersSendingToUs;
1388    }
1389
1390    managerUnlock( (tr_peerMgr*)manager );
1391}
1392
1393struct tr_peer_stat *
1394tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1395                     const uint8_t     * torrentHash,
1396                     int               * setmeCount UNUSED )
1397{
1398    int i, size;
1399    const Torrent * t;
1400    tr_peer ** peers;
1401    tr_peer_stat * ret;
1402
1403    assert( manager != NULL );
1404    managerLock( (tr_peerMgr*)manager );
1405
1406    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1407    peers = getConnectedPeers( (Torrent*)t, &size );
1408    ret = tr_new0( tr_peer_stat, size );
1409
1410    for( i=0; i<size; ++i )
1411    {
1412        const tr_peer * peer = peers[i];
1413        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1414        tr_peer_stat * stat = ret + i;
1415
1416        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1417        strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1418        stat->port             = peer->port;
1419        stat->from             = atom->from;
1420        stat->progress         = peer->progress;
1421        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1422        stat->uploadToRate     = peer->rateToPeer;
1423        stat->downloadFromRate = peer->rateToClient;
1424        stat->isDownloading    = stat->uploadToRate > 0.01;
1425        stat->isUploading      = stat->downloadFromRate > 0.01;
1426        stat->status           = peer->status;
1427    }
1428
1429    *setmeCount = size;
1430    tr_free( peers );
1431
1432    managerUnlock( (tr_peerMgr*)manager );
1433    return ret;
1434}
1435
1436/**
1437***
1438**/
1439
1440struct ChokeData
1441{
1442    tr_peer * peer;
1443    double rate;
1444    int preferred;
1445    int doUnchoke;
1446};
1447
1448static int
1449compareChoke( const void * va, const void * vb )
1450{
1451    int i;
1452    const struct ChokeData * a = va;
1453    const struct ChokeData * b = vb;
1454
1455    if(( i = (int)( 10 * ( a->rate - b->rate ))))
1456        return i;
1457
1458    if( a->preferred != b->preferred )
1459        return a->preferred ? -1 : 1;
1460
1461    return 0;
1462}
1463
1464static int
1465clientIsSnubbedBy( const tr_peer * peer )
1466{
1467    assert( peer != NULL );
1468
1469    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1470}
1471
1472/**
1473***
1474**/
1475
1476static double
1477getWeightedThroughput( const tr_peer * peer )
1478{
1479    /* FIXME: tweak this? */
1480    return ( 1 * peer->rateToPeer )
1481         + ( 1 * peer->rateToClient );
1482}
1483
1484static void
1485rechoke( Torrent * t )
1486{
1487    int i, peerCount, size=0;
1488    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1489    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1490    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1491
1492    assert( torrentIsLocked( t ) );
1493   
1494    /* sort the peers by preference and rate */
1495    for( i=0; i<peerCount; ++i )
1496    {
1497        tr_peer * peer = peers[i];
1498        struct ChokeData * node;
1499        if( peer->chokeChangedAt > fibrillationTime )
1500            continue;
1501
1502        node = &choke[size++];
1503        node->peer = peer;
1504        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1505        node->rate = getWeightedThroughput( peer );
1506    }
1507
1508    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1509
1510    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i )
1511        choke[i].doUnchoke = 1;
1512
1513    for( ; i<size; ++i ) {
1514        choke[i].doUnchoke = 1;
1515        if( choke[i].peer->peerIsInterested )
1516            break;
1517    }
1518
1519    for( i=0; i<size; ++i )
1520        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1521
1522    /* cleanup */
1523    tr_free( choke );
1524    tr_free( peers );
1525}
1526
1527static int
1528rechokePulse( void * vtorrent )
1529{
1530    Torrent * t = vtorrent;
1531    torrentLock( t );
1532    rechoke( t );
1533    torrentUnlock( t );
1534    return TRUE;
1535}
1536
1537/***
1538****
1539****  Life and Death
1540****
1541***/
1542
1543static int
1544shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1545{
1546    const tr_torrent * tor = t->tor;
1547    const time_t now = time( NULL );
1548    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1549
1550    /* if it's marked for purging, close it */
1551    if( peer->doPurge ) {
1552        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1553        return TRUE;
1554    }
1555
1556    /* if we're both seeds and it's been long enough for a pex exchange, close it */
1557    if( 1 ) {
1558        const int clientIsSeed = tr_torrentIsSeed( tor );
1559        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1560        if( peerIsSeed && clientIsSeed && ( !tr_torrentIsPexEnabled(tor) || (now-atom->time>=30) ) ) {
1561            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1562            return TRUE;
1563        }
1564    }
1565
1566    /* disconnect if it's been too long since piece data has been transferred.
1567     * this is on a sliding scale based on number of available peers... */
1568    if( 1 ) {
1569        const int relaxStrictnessIfFewerThanN = (int)((MAX_CONNECTED_PEERS_PER_TORRENT * 0.9) + 0.5);
1570        /* if we have >= relaxIfFewerThan, strictness is 100%.
1571         * if we have zero connections, strictness is 0% */
1572        const double strictness = peerCount >= relaxStrictnessIfFewerThanN
1573            ? 1.0
1574            : peerCount / (double)relaxStrictnessIfFewerThanN;
1575        const int lo = MIN_UPLOAD_IDLE_SECS;
1576        const int hi = MAX_UPLOAD_IDLE_SECS;
1577        const int limit = lo + ((hi-lo) * strictness);
1578        const time_t then = peer->pieceDataActivityDate;
1579        const int idleTime = then ? (now-then) : 0;
1580        if( idleTime > limit ) {
1581            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1582                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1583            return TRUE;
1584        }
1585    }
1586
1587    return FALSE;
1588}
1589
1590static tr_peer **
1591getPeersToClose( Torrent * t, int * setmeSize )
1592{
1593    int i, peerCount, outsize;
1594    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1595    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1596
1597    assert( torrentIsLocked( t ) );
1598
1599    for( i=outsize=0; i<peerCount; ++i )
1600        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1601            ret[outsize++] = peers[i];
1602
1603    *setmeSize = outsize;
1604    return ret;
1605}
1606
1607static int
1608compareCandidates( const void * va, const void * vb )
1609{
1610    const struct peer_atom * a = * (const struct peer_atom**) va;
1611    const struct peer_atom * b = * (const struct peer_atom**) vb;
1612    int i;
1613
1614    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1615        return i;
1616
1617    if( a->time != b->time )
1618        return a->time < b->time ? -1 : 1;
1619
1620    return 0;
1621}
1622
1623static struct peer_atom **
1624getPeerCandidates( Torrent * t, int * setmeSize )
1625{
1626    int i, atomCount, retCount;
1627    struct peer_atom ** atoms;
1628    struct peer_atom ** ret;
1629    const time_t now = time( NULL );
1630    const int seed = tr_torrentIsSeed( t->tor );
1631
1632    assert( torrentIsLocked( t ) );
1633
1634    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1635    ret = tr_new( struct peer_atom*, atomCount );
1636    for( i=retCount=0; i<atomCount; ++i )
1637    {
1638        int wait, minWait, maxWait;
1639        struct peer_atom * atom = atoms[i];
1640
1641        /* peer fed us too much bad data ... we only keep it around
1642         * now to weed it out in case someone sends it to us via pex */
1643        if( atom->myflags & MYFLAG_BANNED ) {
1644            tordbg( t, "RECONNECT peer %d (%s) is banned...",
1645                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1646            continue;
1647        }
1648
1649        /* we don't need two connections to the same peer... */
1650        if( peerIsInUse( t, &atom->addr ) ) {
1651            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1652                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1653            continue;
1654        }
1655
1656        /* no need to connect if we're both seeds... */
1657        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1658            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1659                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1660            continue;
1661        }
1662
1663        /* we're wasting our time trying to connect to this bozo. */
1664        if( atom->numFails > 10 ) {
1665            tordbg( t, "RECONNECT peer %d (%s) gives us nothing but failure.",
1666                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1667            continue;
1668        }
1669
1670        /* if we used this peer recently, give someone else a turn */
1671        minWait = 10; /* ten seconds */
1672        maxWait = (60 * 20); /* twenty minutes */
1673        wait = atom->numFails * 30; /* add 15 secs to the wait interval for each consecutive failure*/
1674        if( wait < minWait ) wait = minWait;
1675        if( wait > maxWait ) wait = maxWait;
1676        if( ( now - atom->time ) < wait ) {
1677            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1678                    i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1679            continue;
1680        }
1681
1682        ret[retCount++] = atom;
1683    }
1684
1685    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1686    *setmeSize = retCount;
1687    return ret;
1688}
1689
1690static int
1691reconnectPulse( void * vtorrent )
1692{
1693    Torrent * t = vtorrent;
1694
1695    torrentLock( t );
1696
1697    if( !t->isRunning )
1698    {
1699        removeAllPeers( t );
1700    }
1701    else
1702    {
1703        int i, nCandidates, nBad, nAdd;
1704        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1705        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1706        const int peerCount = tr_ptrArraySize( t->peers );
1707
1708        if( nBad || nCandidates )
1709            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1710                       "%d connection candidates, %d atoms, max per pulse is %d",
1711                       t->tor->info.name, nBad, nCandidates,
1712                       tr_ptrArraySize(t->pool),
1713                       (int)MAX_RECONNECTIONS_PER_PULSE );
1714
1715        /* disconnect some peers.
1716           if we got transferred piece data, then they might be good peers,
1717           so reset their `numFails' weight to zero.  otherwise we connected
1718           to them fruitlessly, so mark it as another fail */
1719        for( i=0; i<nBad; ++i ) {
1720            tr_peer * peer = connections[i];
1721            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1722            if( peer->pieceDataActivityDate )
1723                atom->numFails = 0;
1724            else
1725                ++atom->numFails;
1726            removePeer( t, peer );
1727        }
1728
1729        /* add some new ones */
1730        nAdd = !peerCount ? MAX_CONNECTED_PEERS_PER_TORRENT
1731                          : MAX_RECONNECTIONS_PER_PULSE;
1732        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1733        {
1734            tr_peerMgr * mgr = t->manager;
1735            struct peer_atom * atom = candidates[i];
1736            tr_peerIo * io;
1737            tr_handshake * handshake;
1738
1739            tordbg( t, "Starting an OUTGOING connection with %s",
1740                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1741
1742            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1743
1744            handshake = tr_handshakeNew( io,
1745                                         mgr->handle->encryptionMode,
1746                                         myHandshakeDoneCB,
1747                                         mgr );
1748
1749            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1750
1751            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1752
1753            atom->time = time( NULL );
1754        }
1755
1756        /* cleanup */
1757        tr_free( connections );
1758        tr_free( candidates );
1759    }
1760
1761    torrentUnlock( t );
1762    return TRUE;
1763}
Note: See TracBrowser for help on using the repository browser.