source: trunk/libtransmission/peer-mgr.c @ 6071

Last change on this file since 6071 was 6071, checked in by charles, 13 years ago

minor refactoring of tr_bitfield to (a) simplify the tests and (b) make things easier to read

  • Property svn:keywords set to Date Rev Author Id
File size: 53.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6071 2008-06-07 01:44:54Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ptrarray.h"
33#include "ratecontrol.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38enum
39{
40    /* how frequently to change which peers are choked */
41    RECHOKE_PERIOD_MSEC = (10 * 1000),
42
43    /* how frequently to refill peers' request lists */
44    REFILL_PERIOD_MSEC = 666,
45
46    /* following the BT spec, we consider ourselves `snubbed' if
47     * we're we don't get piece data from a peer in this long */
48    SNUBBED_SEC = 60,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = (60 * 3),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = (60 * 10),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = (2 * 1000),
58
59    /* max # of peers to ask fer per torrent per reconnect pulse */
60    MAX_RECONNECTIONS_PER_PULSE = 1,
61
62    /* max number of peers to ask for per second overall.
63     * this throttle is to avoid overloading the router */
64    MAX_CONNECTIONS_PER_SECOND = 8,
65
66    /* number of unchoked peers per torrent */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_timer * reconnectTimer;
107    tr_timer * rechokeTimer;
108    tr_timer * refillTimer;
109    tr_torrent * tor;
110    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
111    tr_bitfield * requested;
112
113    unsigned int isRunning : 1;
114
115    struct tr_peerMgr * manager;
116}
117Torrent;
118
119struct tr_peerMgr
120{
121    tr_handle * handle;
122    tr_ptrArray * torrents; /* Torrent */
123    tr_ptrArray * incomingHandshakes; /* tr_handshake */
124};
125
126#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
127
128/**
129***
130**/
131
132static void
133managerLock( const struct tr_peerMgr * manager )
134{
135    tr_globalLock( manager->handle );
136}
137static void
138managerUnlock( const struct tr_peerMgr * manager )
139{
140    tr_globalUnlock( manager->handle );
141}
142static void
143torrentLock( Torrent * torrent )
144{
145    managerLock( torrent->manager );
146}
147static void
148torrentUnlock( Torrent * torrent )
149{
150    managerUnlock( torrent->manager );
151}
152static int
153torrentIsLocked( const Torrent * t )
154{
155    return tr_globalIsLocked( t->manager->handle );
156}
157
158/**
159***
160**/
161
162static int
163compareAddresses( const struct in_addr * a, const struct in_addr * b )
164{
165    return tr_compareUint32( a->s_addr, b->s_addr );
166}
167
168static int
169handshakeCompareToAddr( const void * va, const void * vb )
170{
171    const tr_handshake * a = va;
172    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
173}
174
175static int
176handshakeCompare( const void * a, const void * b )
177{
178    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
179}
180
181static tr_handshake*
182getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
183{
184    return tr_ptrArrayFindSorted( handshakes,
185                                  in_addr,
186                                  handshakeCompareToAddr );
187}
188
189static int
190comparePeerAtomToAddress( const void * va, const void * vb )
191{
192    const struct peer_atom * a = va;
193    return compareAddresses( &a->addr, vb );
194}
195
196static int
197comparePeerAtoms( const void * va, const void * vb )
198{
199    const struct peer_atom * b = vb;
200    return comparePeerAtomToAddress( va, &b->addr );
201}
202
203/**
204***
205**/
206
207static int
208torrentCompare( const void * va, const void * vb )
209{
210    const Torrent * a = va;
211    const Torrent * b = vb;
212    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
213}
214
215static int
216torrentCompareToHash( const void * va, const void * vb )
217{
218    const Torrent * a = va;
219    const uint8_t * b_hash = vb;
220    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
221}
222
223static Torrent*
224getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
225{
226    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
227                                             hash,
228                                             torrentCompareToHash );
229}
230
231static int
232peerCompare( const void * va, const void * vb )
233{
234    const tr_peer * a = va;
235    const tr_peer * b = vb;
236    return compareAddresses( &a->in_addr, &b->in_addr );
237}
238
239static int
240peerCompareToAddr( const void * va, const void * vb )
241{
242    const tr_peer * a = va;
243    return compareAddresses( &a->in_addr, vb );
244}
245
246static tr_peer*
247getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
248{
249    assert( torrentIsLocked( torrent ) );
250    assert( in_addr != NULL );
251
252    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
253                                             in_addr,
254                                             peerCompareToAddr );
255}
256
257static struct peer_atom*
258getExistingAtom( const Torrent * t, const struct in_addr * addr )
259{
260    assert( torrentIsLocked( t ) );
261    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
262}
263
264static int
265peerIsInUse( const Torrent * ct, const struct in_addr * addr )
266{
267    Torrent * t = (Torrent*) ct;
268
269    assert( torrentIsLocked ( t ) );
270
271    return getExistingPeer( t, addr )
272        || getExistingHandshake( t->outgoingHandshakes, addr )
273        || getExistingHandshake( t->manager->incomingHandshakes, addr );
274}
275
276static tr_peer*
277peerConstructor( const struct in_addr * in_addr )
278{
279    tr_peer * p;
280    p = tr_new0( tr_peer, 1 );
281    p->rcToClient = tr_rcInit( );
282    p->rcToPeer = tr_rcInit( );
283    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
284    return p;
285}
286
287static tr_peer*
288getPeer( Torrent * torrent, const struct in_addr * in_addr )
289{
290    tr_peer * peer;
291
292    assert( torrentIsLocked( torrent ) );
293
294    peer = getExistingPeer( torrent, in_addr );
295
296    if( peer == NULL ) {
297        peer = peerConstructor( in_addr );
298        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
299    }
300
301    return peer;
302}
303
304static void
305peerDestructor( tr_peer * peer )
306{
307    assert( peer != NULL );
308    assert( peer->msgs != NULL );
309
310    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
311    tr_peerMsgsFree( peer->msgs );
312
313    tr_peerIoFree( peer->io );
314
315    tr_bitfieldFree( peer->have );
316    tr_bitfieldFree( peer->blame );
317    tr_rcClose( peer->rcToClient );
318    tr_rcClose( peer->rcToPeer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    peerDestructor( removed );
338}
339
340static void
341removeAllPeers( Torrent * t )
342{
343    while( !tr_ptrArrayEmpty( t->peers ) )
344        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
345}
346
347static void
348torrentDestructor( Torrent * t )
349{
350    uint8_t hash[SHA_DIGEST_LENGTH];
351
352    assert( t != NULL );
353    assert( !t->isRunning );
354    assert( t->peers != NULL );
355    assert( torrentIsLocked( t ) );
356    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
357    assert( tr_ptrArrayEmpty( t->peers ) );
358
359    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
360
361    tr_timerFree( &t->reconnectTimer );
362    tr_timerFree( &t->rechokeTimer );
363    tr_timerFree( &t->refillTimer );
364
365    tr_bitfieldFree( t->requested );
366    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
367    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
368    tr_ptrArrayFree( t->peers, NULL );
369
370    tr_free( t );
371}
372
373static Torrent*
374torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
375{
376    Torrent * t;
377
378    t = tr_new0( Torrent, 1 );
379    t->manager = manager;
380    t->tor = tor;
381    t->pool = tr_ptrArrayNew( );
382    t->peers = tr_ptrArrayNew( );
383    t->outgoingHandshakes = tr_ptrArrayNew( );
384    t->requested = tr_bitfieldNew( tor->blockCount );
385    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
386
387    return t;
388}
389
390/**
391 * For explanation, see http://www.bittorrent.org/fast_extensions.html
392 * Also see the "test-allowed-set" unit test
393 */
394struct tr_bitfield *
395tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
396                              const uint32_t         sz,        /* number of pieces in torrent */
397                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
398                              const struct in_addr * ip )       /* peer's address */
399{
400    uint8_t w[SHA_DIGEST_LENGTH + 4];
401    uint8_t x[SHA_DIGEST_LENGTH];
402    tr_bitfield_t * a;
403    uint32_t a_size;
404
405    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
406    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
407    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
408
409    a = tr_bitfieldNew( sz );
410    a_size = 0;
411   
412    while( a_size < k )
413    {
414        int i;
415        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
416        {
417            uint32_t j = i * 4;                                /* (5) */
418            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
419            uint32_t index = y % sz;                           /* (7) */
420            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
421                tr_bitfieldAdd( a, index );                    /* (9) */
422                ++a_size;
423            }
424        }
425        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
426    }
427   
428    return a;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->incomingHandshakes = tr_ptrArrayNew( );
438    return m;
439}
440
441void
442tr_peerMgrFree( tr_peerMgr * manager )
443{
444    managerLock( manager );
445
446    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
447     * the item from manager->handshakes, so this is a little roundabout... */
448    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
449        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
450    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
451
452    /* free the torrents. */
453    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
454
455    managerUnlock( manager );
456    tr_free( manager );
457}
458
459static tr_peer**
460getConnectedPeers( Torrent * t, int * setmeCount )
461{
462    int i, peerCount, connectionCount;
463    tr_peer **peers;
464    tr_peer **ret;
465
466    assert( torrentIsLocked( t ) );
467
468    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
469    ret = tr_new( tr_peer*, peerCount );
470
471    for( i=connectionCount=0; i<peerCount; ++i )
472        if( peers[i]->msgs != NULL )
473            ret[connectionCount++] = peers[i];
474
475    *setmeCount = connectionCount;
476    return ret;
477}
478
479static int
480clientIsDownloadingFrom( const tr_peer * peer )
481{
482    return peer->clientIsInterested && !peer->clientIsChoked;
483}
484
485static int
486clientIsUploadingTo( const tr_peer * peer )
487{
488    return peer->peerIsInterested && !peer->peerIsChoked;
489}
490
491/***
492****
493***/
494
495int
496tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
497                      const uint8_t          * torrentHash,
498                      const struct in_addr   * addr )
499{
500    int isSeed = FALSE;
501    const Torrent * t = NULL;
502    const struct peer_atom * atom = NULL;
503
504    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
505    if( t )
506        atom = getExistingAtom( t, addr );
507    if( atom )
508        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
509
510    return isSeed;
511}
512
513/***
514****  Refill
515***/
516
517struct tr_refill_piece
518{
519    tr_priority_t priority;
520    int missingBlockCount;
521    uint16_t random;
522    uint32_t piece;
523    uint32_t peerCount;
524};
525
526static int
527compareRefillPiece (const void * aIn, const void * bIn)
528{
529    const struct tr_refill_piece * a = aIn;
530    const struct tr_refill_piece * b = bIn;
531
532    /* fewer missing pieces goes first */
533    if( a->missingBlockCount != b->missingBlockCount )
534        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
535   
536    /* if one piece has a higher priority, it goes first */
537    if( a->priority != b->priority )
538        return a->priority > b->priority ? -1 : 1;
539   
540    /* otherwise if one has fewer peers, it goes first */
541    if (a->peerCount != b->peerCount)
542        return a->peerCount < b->peerCount ? -1 : 1;
543
544    /* otherwise go with our random seed */
545    return tr_compareUint16( a->random, b->random );
546}
547
548static int
549isPieceInteresting( const tr_torrent  * tor,
550                    tr_piece_index_t    piece )
551{
552    if( tor->info.pieces[piece].dnd ) /* we don't want it */
553        return 0;
554
555    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
556        return 0;
557
558    return 1;
559}
560
561static uint32_t*
562getPreferredPieces( Torrent     * t,
563                    uint32_t    * pieceCount )
564{
565    const tr_torrent * tor = t->tor;
566    const tr_info * inf = &tor->info;
567    tr_piece_index_t i;
568    uint32_t poolSize = 0;
569    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
570    int peerCount;
571    tr_peer** peers;
572
573    assert( torrentIsLocked( t ) );
574
575    peers = getConnectedPeers( t, &peerCount );
576
577    for( i=0; i<inf->pieceCount; ++i )
578        if( isPieceInteresting( tor, i ) )
579            pool[poolSize++] = i;
580
581    /* sort the pool from most interesting to least... */
582    if( poolSize > 1 )
583    {
584        uint32_t j;
585        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
586
587        for( j=0; j<poolSize; ++j )
588        {
589            int k;
590            const int piece = pool[j];
591            struct tr_refill_piece * setme = p + j;
592
593            setme->piece = piece;
594            setme->priority = inf->pieces[piece].priority;
595            setme->peerCount = 0;
596            setme->random = tr_rand( UINT16_MAX );
597            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
598
599            for( k=0; k<peerCount; ++k ) {
600                const tr_peer * peer = peers[k];
601                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
602                    ++setme->peerCount;
603            }
604        }
605
606        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
607
608        for( j=0; j<poolSize; ++j )
609            pool[j] = p[j].piece;
610
611        tr_free( p );
612    }
613
614    tr_free( peers );
615
616    *pieceCount = poolSize;
617    return pool;
618}
619
620static uint64_t*
621getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
622{
623    int s;
624    uint32_t i;
625    uint32_t pieceCount;
626    uint32_t blockCount;
627    uint32_t unreqCount[3], reqCount[3];
628    uint32_t * pieces;
629    uint64_t * ret, * walk;
630    uint64_t * unreq[3], *req[3];
631    const tr_torrent * tor = t->tor;
632
633    assert( torrentIsLocked( t ) );
634
635    pieces = getPreferredPieces( t, &pieceCount );
636
637    /**
638     * Now we walk through those preferred pieces to find all the blocks
639     * are still missing from them.  We put unrequested blocks first,
640     * of course, but by including requested blocks afterwards, endgame
641     * handling happens naturally.
642     *
643     * By doing this once per priority we also effectively get an endgame
644     * mode for each priority level.  The helps keep high priority files
645     * from getting stuck at 99% due of unresponsive peers.
646     */
647
648    /* make temporary bins for the four tiers of blocks */
649    for( i=0; i<3; ++i ) {
650        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
651        reqCount[i] = 0;
652        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
653        unreqCount[i] = 0;
654    }
655
656    /* sort the blocks into our temp bins */
657    for( i=blockCount=0; i<pieceCount; ++i )
658    {
659        const tr_piece_index_t index = pieces[i];
660        const int priorityIndex = tor->info.pieces[index].priority + 1;
661        const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
662        const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
663        tr_block_index_t block;
664
665        assert( tr_bitfieldTestFast( t->requested, end-1 ) );
666
667        for( block=begin; block<end; ++block )
668        {
669            if( tr_cpBlockIsComplete( tor->completion, block ) )
670                continue;
671
672            ++blockCount;
673
674            if( tr_bitfieldHasFast( t->requested, block ) )
675            {
676                const uint32_t n = reqCount[priorityIndex]++;
677                req[priorityIndex][n] = block;
678            }
679            else
680            {
681                const uint32_t n = unreqCount[priorityIndex]++;
682                unreq[priorityIndex][n] = block;
683            }
684        }
685    }
686
687    /* join the bins together, going from highest priority to lowest so
688     * the the blocks we want to request first will be first in the list */
689    ret = walk = tr_new( uint64_t, blockCount );
690    for( s=2; s>=0; --s ) {
691        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
692        walk += unreqCount[s];
693        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
694        walk += reqCount[s];
695    }
696    assert( ( walk - ret ) == ( int )blockCount );
697    *setmeCount = blockCount;
698
699    /* cleanup */
700    tr_free( pieces );
701    for( i=0; i<3; ++i ) {
702        tr_free( unreq[i] );
703        tr_free( req[i] );
704    }
705    return ret;
706}
707
708static tr_peer**
709getPeersUploadingToClient( Torrent * t, int * setmeCount )
710{
711    int i;
712    int peerCount = 0;
713    int retCount = 0;
714    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
715    tr_peer ** ret = tr_new( tr_peer*, peerCount );
716
717    /* get a list of peers we're downloading from */
718    for( i=0; i<peerCount; ++i )
719        if( clientIsDownloadingFrom( peers[i] ) )
720            ret[retCount++] = peers[i];
721
722    /* pick a different starting point each time so all peers
723     * get a chance at the first blocks in the queue */
724    if( retCount ) {
725        tr_peer ** tmp = tr_new( tr_peer*, retCount );
726        i = tr_rand( retCount );
727        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
728        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
729        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
730        tr_free( tmp );
731    }
732
733    *setmeCount = retCount;
734    return ret;
735}
736
737static int
738refillPulse( void * vtorrent )
739{
740    Torrent * t = vtorrent;
741    tr_torrent * tor = t->tor;
742    tr_block_index_t i;
743    int peerCount;
744    tr_peer ** peers;
745    tr_block_index_t blockCount;
746    uint64_t * blocks;
747
748    if( !t->isRunning )
749        return TRUE;
750    if( tr_torrentIsSeed( t->tor ) )
751        return TRUE;
752
753    torrentLock( t );
754    tordbg( t, "Refilling Request Buffers..." );
755
756    blocks = getPreferredBlocks( t, &blockCount );
757    peers = getPeersUploadingToClient( t, &peerCount );
758
759    for( i=0; peerCount && i<blockCount; ++i )
760    {
761        int j;
762
763        const tr_block_index_t block = blocks[i];
764        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
765        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
766        const uint32_t length = tr_torBlockCountBytes( tor, block );
767
768        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
769        assert( _tr_block( tor, index, begin ) == block );
770        assert( begin < tr_torPieceCountBytes( tor, index ) );
771        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
772
773        /* find a peer who can ask for this block */
774        for( j=0; j<peerCount; )
775        {
776            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
777            switch( val )
778            {
779                case TR_ADDREQ_FULL: 
780                case TR_ADDREQ_CLIENT_CHOKED:
781                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
782                    break;
783
784                case TR_ADDREQ_MISSING: 
785                case TR_ADDREQ_DUPLICATE: 
786                    ++j;
787                    break;
788
789                case TR_ADDREQ_OK:
790                    tr_bitfieldAdd( t->requested, block );
791                    j = peerCount;
792                    break;
793
794                default:
795                    assert( 0 && "unhandled value" );
796                    break;
797            }
798        }
799    }
800
801    /* cleanup */
802    tr_free( peers );
803    tr_free( blocks );
804
805    t->refillTimer = NULL;
806    torrentUnlock( t );
807    return FALSE;
808}
809
810static void
811broadcastClientHave( Torrent * t, uint32_t index )
812{
813    int i, size;
814    tr_peer ** peers;
815
816    assert( torrentIsLocked( t ) );
817
818    peers = getConnectedPeers( t, &size );
819    for( i=0; i<size; ++i )
820        tr_peerMsgsHave( peers[i]->msgs, index );
821    tr_free( peers );
822}
823
824static void
825broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
826{
827    int i, size;
828    tr_peer ** peers;
829
830    assert( torrentIsLocked( t ) );
831
832    peers = getConnectedPeers( t, &size );
833    for( i=0; i<size; ++i )
834        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
835    tr_free( peers );
836}
837
838static void
839addStrike( Torrent * t, tr_peer * peer )
840{
841    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
842
843    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
844    {
845        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
846        atom->myflags |= MYFLAG_BANNED;
847        peer->doPurge = 1;
848        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
849    }
850}
851
852static void
853msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
854{
855    tr_peer * peer = vpeer;
856    Torrent * t = (Torrent *) vt;
857    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
858
859    torrentLock( t );
860
861    switch( e->eventType )
862    {
863        case TR_PEERMSG_NEED_REQ:
864            if( t->refillTimer == NULL )
865                t->refillTimer = tr_timerNew( t->manager->handle,
866                                              refillPulse, t,
867                                              REFILL_PERIOD_MSEC );
868            break;
869
870        case TR_PEERMSG_CANCEL:
871            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
872            break;
873
874        case TR_PEERMSG_PIECE_DATA: {
875            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
876            atom->piece_data_time = time( NULL );
877            break;
878        }
879
880        case TR_PEERMSG_CLIENT_HAVE:
881            broadcastClientHave( t, e->pieceIndex );
882            tr_torrentRecheckCompleteness( t->tor );
883            break;
884
885        case TR_PEERMSG_PEER_PROGRESS: {
886            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
887            const int peerIsSeed = e->progress >= 1.0;
888            if( peerIsSeed ) {
889                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
890                atom->flags |= ADDED_F_SEED_FLAG;
891            } else {
892                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
893                atom->flags &= ~ADDED_F_SEED_FLAG;
894            } break;
895        }
896
897        case TR_PEERMSG_CLIENT_BLOCK:
898            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
899            break;
900
901        case TR_PEERMSG_ERROR:
902            if( TR_ERROR_IS_IO( e->err ) ) {
903                t->tor->error = e->err;
904                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
905                tr_torrentStop( t->tor );
906            } else if( e->err == TR_ERROR_ASSERT ) {
907                addStrike( t, peer );
908            }
909            peer->doPurge = 1;
910            break;
911
912        default:
913            assert(0);
914    }
915
916    torrentUnlock( t );
917}
918
919static void
920ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
921{
922    if( getExistingAtom( t, addr ) == NULL )
923    {
924        struct peer_atom * a;
925        a = tr_new0( struct peer_atom, 1 );
926        a->addr = *addr;
927        a->port = port;
928        a->flags = flags;
929        a->from = from;
930        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
931        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
932    }
933}
934
935static int
936getMaxPeerCount( const tr_torrent * tor UNUSED )
937{
938    return tor->maxConnectedPeers;
939}
940
941/* FIXME: this is kind of a mess. */
942static void
943myHandshakeDoneCB( tr_handshake    * handshake,
944                   tr_peerIo       * io,
945                   int               isConnected,
946                   const uint8_t   * peer_id,
947                   void            * vmanager )
948{
949    int ok = isConnected;
950    uint16_t port;
951    const struct in_addr * addr;
952    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
953    Torrent * t;
954    tr_handshake * ours;
955
956    assert( io != NULL );
957    assert( isConnected==0 || isConnected==1 );
958
959    t = tr_peerIoHasTorrentHash( io )
960        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
961        : NULL;
962
963    if( tr_peerIoIsIncoming ( io ) )
964        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
965                                        handshake, handshakeCompare );
966    else if( t != NULL )
967        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
968                                        handshake, handshakeCompare );
969    else
970        ours = handshake;
971
972    assert( ours != NULL );
973    assert( ours == handshake );
974
975    if( t != NULL )
976        torrentLock( t );
977
978    addr = tr_peerIoGetAddress( io, &port );
979
980    if( !ok || !t || !t->isRunning )
981    {
982        if( t ) {
983            struct peer_atom * atom = getExistingAtom( t, addr );
984            if( atom )
985                ++atom->numFails;
986        }
987
988        tr_peerIoFree( io );
989    }
990    else /* looking good */
991    {
992        struct peer_atom * atom;
993        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
994        atom = getExistingAtom( t, addr );
995
996        if( atom->myflags & MYFLAG_BANNED )
997        {
998            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
999            tr_peerIoFree( io );
1000        }
1001        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1002        {
1003            tr_peerIoFree( io );
1004        }
1005        else
1006        {
1007            tr_peer * peer = getExistingPeer( t, addr );
1008
1009            if( peer != NULL ) /* we already have this peer */
1010            {
1011                tr_peerIoFree( io );
1012            }
1013            else
1014            {
1015                peer = getPeer( t, addr );
1016                tr_free( peer->client );
1017
1018                if( !peer_id )
1019                    peer->client = NULL;
1020                else {
1021                    char client[128];
1022                    tr_clientForId( client, sizeof( client ), peer_id );
1023                    peer->client = tr_strdup( client );
1024                }
1025                peer->port = port;
1026                peer->io = io;
1027                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1028                atom->time = time( NULL );
1029            }
1030        }
1031    }
1032
1033    if( t != NULL )
1034        torrentUnlock( t );
1035}
1036
1037void
1038tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1039                       struct in_addr  * addr,
1040                       uint16_t          port,
1041                       int               socket )
1042{
1043    managerLock( manager );
1044
1045    if( tr_blocklistHasAddress( manager->handle, addr ) )
1046    {
1047        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1048                inet_ntoa( *addr ) );
1049        tr_netClose( socket );
1050    }
1051    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1052    {
1053        tr_netClose( socket );
1054    }
1055    else /* we don't have a connetion to them yet... */
1056    {
1057        tr_peerIo * io;
1058        tr_handshake * handshake;
1059
1060        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1061
1062        handshake = tr_handshakeNew( io,
1063                                     manager->handle->encryptionMode,
1064                                     myHandshakeDoneCB,
1065                                     manager );
1066
1067        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1068    }
1069
1070    managerUnlock( manager );
1071}
1072
1073void
1074tr_peerMgrAddPex( tr_peerMgr     * manager,
1075                  const uint8_t  * torrentHash,
1076                  uint8_t          from,
1077                  const tr_pex   * pex )
1078{
1079    Torrent * t;
1080    managerLock( manager );
1081
1082    t = getExistingTorrent( manager, torrentHash );
1083    if( !tr_blocklistHasAddress( t->manager->handle, &pex->in_addr ) )
1084        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1085
1086    managerUnlock( manager );
1087}
1088
1089tr_pex *
1090tr_peerMgrCompactToPex( const void  * compact,
1091                        size_t        compactLen,
1092                        const char  * added_f,
1093                        size_t      * pexCount )
1094{
1095    size_t i;
1096    size_t n = compactLen / 6;
1097    const uint8_t * walk = compact;
1098    tr_pex * pex = tr_new0( tr_pex, n );
1099    for( i=0; i<n; ++i ) {
1100        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1101        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1102        if( added_f )
1103            pex[i].flags = added_f[i];
1104    }
1105    *pexCount = n;
1106    return pex;
1107}
1108
1109/**
1110***
1111**/
1112
1113void
1114tr_peerMgrSetBlame( tr_peerMgr     * manager,
1115                    const uint8_t  * torrentHash,
1116                    int              pieceIndex,
1117                    int              success )
1118{
1119    if( !success )
1120    {
1121        int peerCount, i;
1122        Torrent * t = getExistingTorrent( manager, torrentHash );
1123        tr_peer ** peers;
1124
1125        assert( torrentIsLocked( t ) );
1126
1127        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1128        for( i=0; i<peerCount; ++i )
1129        {
1130            tr_peer * peer = peers[i];
1131            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1132            {
1133                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1134                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1135                           pieceIndex, (int)peer->strikes+1 );
1136                addStrike( t, peer );
1137            }
1138        }
1139    }
1140}
1141
1142int
1143tr_pexCompare( const void * va, const void * vb )
1144{
1145    const tr_pex * a = (const tr_pex *) va;
1146    const tr_pex * b = (const tr_pex *) vb;
1147    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1148    if( i ) return i;
1149    if( a->port < b->port ) return -1;
1150    if( a->port > b->port ) return 1;
1151    return 0;
1152}
1153
1154int tr_pexCompare( const void * a, const void * b );
1155
1156static int
1157peerPrefersCrypto( const tr_peer * peer )
1158{
1159    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1160        return TRUE;
1161
1162    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1163        return FALSE;
1164
1165    return tr_peerIoIsEncrypted( peer->io );
1166};
1167
1168int
1169tr_peerMgrGetPeers( tr_peerMgr      * manager,
1170                    const uint8_t   * torrentHash,
1171                    tr_pex         ** setme_pex )
1172{
1173    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1174    int i, peerCount;
1175    const tr_peer ** peers;
1176    tr_pex * pex;
1177    tr_pex * walk;
1178
1179    managerLock( manager );
1180
1181    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1182    pex = walk = tr_new( tr_pex, peerCount );
1183
1184    for( i=0; i<peerCount; ++i, ++walk )
1185    {
1186        const tr_peer * peer = peers[i];
1187
1188        walk->in_addr = peer->in_addr;
1189
1190        walk->port = peer->port;
1191
1192        walk->flags = 0;
1193        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1194        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1195    }
1196
1197    assert( ( walk - pex ) == peerCount );
1198    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1199    *setme_pex = pex;
1200
1201    managerUnlock( manager );
1202
1203    return peerCount;
1204}
1205
1206static int reconnectPulse( void * vtorrent );
1207static int rechokePulse( void * vtorrent );
1208
1209void
1210tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1211                        const uint8_t  * torrentHash )
1212{
1213    Torrent * t;
1214
1215    managerLock( manager );
1216
1217    t = getExistingTorrent( manager, torrentHash );
1218
1219    assert( t != NULL );
1220    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1221    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1222
1223    if( !t->isRunning )
1224    {
1225        t->isRunning = 1;
1226
1227        t->reconnectTimer = tr_timerNew( t->manager->handle,
1228                                         reconnectPulse, t,
1229                                         RECONNECT_PERIOD_MSEC );
1230
1231        t->rechokeTimer = tr_timerNew( t->manager->handle,
1232                                       rechokePulse, t,
1233                                       RECHOKE_PERIOD_MSEC );
1234
1235        reconnectPulse( t );
1236
1237        rechokePulse( t );
1238    }
1239
1240    managerUnlock( manager );
1241}
1242
1243static void
1244stopTorrent( Torrent * t )
1245{
1246    assert( torrentIsLocked( t ) );
1247
1248    t->isRunning = 0;
1249    tr_timerFree( &t->rechokeTimer );
1250    tr_timerFree( &t->reconnectTimer );
1251
1252    /* disconnect the peers. */
1253    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1254    tr_ptrArrayClear( t->peers );
1255
1256    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1257     * which removes the handshake from t->outgoingHandshakes... */
1258    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1259        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1260}
1261void
1262tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1263                       const uint8_t  * torrentHash)
1264{
1265    managerLock( manager );
1266
1267    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1268
1269    managerUnlock( manager );
1270}
1271
1272void
1273tr_peerMgrAddTorrent( tr_peerMgr * manager,
1274                      tr_torrent * tor )
1275{
1276    Torrent * t;
1277
1278    managerLock( manager );
1279
1280    assert( tor != NULL );
1281    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1282
1283    t = torrentConstructor( manager, tor );
1284    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1285
1286    managerUnlock( manager );
1287}
1288
1289void
1290tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1291                         const uint8_t  * torrentHash )
1292{
1293    Torrent * t;
1294
1295    managerLock( manager );
1296
1297    t = getExistingTorrent( manager, torrentHash );
1298    assert( t != NULL );
1299    stopTorrent( t );
1300    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1301    torrentDestructor( t );
1302
1303    managerUnlock( manager );
1304}
1305
1306void
1307tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1308                               const uint8_t    * torrentHash,
1309                               int8_t           * tab,
1310                               int                tabCount )
1311{
1312    int i;
1313    const Torrent * t;
1314    const tr_torrent * tor;
1315    float interval;
1316
1317    managerLock( (tr_peerMgr*)manager );
1318
1319    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1320    tor = t->tor;
1321    interval = tor->info.pieceCount / (float)tabCount;
1322
1323    memset( tab, 0, tabCount );
1324
1325    for( i=0; i<tabCount; ++i )
1326    {
1327        const int piece = i * interval;
1328
1329        if( tor == NULL )
1330            tab[i] = 0;
1331        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1332            tab[i] = -1;
1333        else {
1334            int j, peerCount;
1335            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1336            for( j=0; j<peerCount; ++j )
1337                if( tr_bitfieldHas( peers[j]->have, i ) )
1338                    ++tab[i];
1339        }
1340    }
1341
1342    managerUnlock( (tr_peerMgr*)manager );
1343}
1344
1345/* Returns the pieces that are available from peers */
1346tr_bitfield*
1347tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1348                        const uint8_t    * torrentHash )
1349{
1350    int i, size;
1351    Torrent * t;
1352    tr_peer ** peers;
1353    tr_bitfield * pieces;
1354    managerLock( (tr_peerMgr*)manager );
1355
1356    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1357    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1358    peers = getConnectedPeers( t, &size );
1359    for( i=0; i<size; ++i )
1360        tr_bitfieldOr( pieces, peers[i]->have );
1361
1362    managerUnlock( (tr_peerMgr*)manager );
1363    tr_free( peers );
1364    return pieces;
1365}
1366
1367int
1368tr_peerMgrHasConnections( const tr_peerMgr * manager,
1369                          const uint8_t    * torrentHash )
1370{
1371    int ret;
1372    const Torrent * t;
1373    managerLock( (tr_peerMgr*)manager );
1374
1375    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1376    ret = t && tr_ptrArraySize( t->peers );
1377
1378    managerUnlock( (tr_peerMgr*)manager );
1379    return ret;
1380}
1381
1382void
1383tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1384                        const uint8_t    * torrentHash,
1385                        int              * setmePeersKnown,
1386                        int              * setmePeersConnected,
1387                        int              * setmeSeedsConnected,
1388                        int              * setmePeersSendingToUs,
1389                        int              * setmePeersGettingFromUs,
1390                        int              * setmePeersFrom )
1391{
1392    int i, size;
1393    const Torrent * t;
1394    const tr_peer ** peers;
1395
1396    managerLock( (tr_peerMgr*)manager );
1397
1398    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1399    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1400
1401    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1402    *setmePeersConnected      = 0;
1403    *setmeSeedsConnected      = 0;
1404    *setmePeersSendingToUs    = 0;
1405    *setmePeersGettingFromUs  = 0;
1406
1407    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1408        setmePeersFrom[i] = 0;
1409
1410    for( i=0; i<size; ++i )
1411    {
1412        const tr_peer * peer = peers[i];
1413        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1414
1415        if( peer->io == NULL ) /* not connected */
1416            continue;
1417
1418        ++*setmePeersConnected;
1419
1420        ++setmePeersFrom[atom->from];
1421
1422        if( clientIsDownloadingFrom( peer ) )
1423            ++*setmePeersSendingToUs;
1424
1425        if( clientIsUploadingTo( peer ) )
1426            ++*setmePeersGettingFromUs;
1427
1428        if( atom->flags & ADDED_F_SEED_FLAG )
1429            ++*setmeSeedsConnected;
1430    }
1431
1432    managerUnlock( (tr_peerMgr*)manager );
1433}
1434
1435struct tr_peer_stat *
1436tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1437                     const uint8_t     * torrentHash,
1438                     int               * setmeCount UNUSED )
1439{
1440    int i, size;
1441    const Torrent * t;
1442    tr_peer ** peers;
1443    tr_peer_stat * ret;
1444
1445    assert( manager != NULL );
1446    managerLock( (tr_peerMgr*)manager );
1447
1448    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1449    peers = getConnectedPeers( (Torrent*)t, &size );
1450    ret = tr_new0( tr_peer_stat, size );
1451
1452    for( i=0; i<size; ++i )
1453    {
1454        char * pch;
1455        const tr_peer * peer = peers[i];
1456        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1457        tr_peer_stat * stat = ret + i;
1458
1459        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1460        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1461        stat->port               = peer->port;
1462        stat->from               = atom->from;
1463        stat->progress           = peer->progress;
1464        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1465        stat->uploadToRate       = peer->rateToPeer;
1466        stat->downloadFromRate   = peer->rateToClient;
1467        stat->peerIsChoked       = peer->peerIsChoked;
1468        stat->peerIsInterested   = peer->peerIsInterested;
1469        stat->clientIsChoked     = peer->clientIsChoked;
1470        stat->clientIsInterested = peer->clientIsInterested;
1471        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1472        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1473        stat->isUploadingTo      = clientIsUploadingTo( peer );
1474
1475        pch = stat->flagStr;
1476        if( t->optimistic == peer ) *pch++ = 'O';
1477        if( stat->isDownloadingFrom ) *pch++ = 'D';
1478        else if( stat->clientIsInterested ) *pch++ = 'd';
1479        if( stat->isUploadingTo ) *pch++ = 'U';
1480        else if( stat->peerIsInterested ) *pch++ = 'u';
1481        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1482        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1483        if( stat->isEncrypted ) *pch++ = 'E';
1484        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1485        if( stat->isIncoming ) *pch++ = 'I';
1486        *pch = '\0';
1487    }
1488
1489    *setmeCount = size;
1490    tr_free( peers );
1491
1492    managerUnlock( (tr_peerMgr*)manager );
1493    return ret;
1494}
1495
1496/**
1497***
1498**/
1499
1500struct ChokeData
1501{
1502    uint8_t doUnchoke;
1503    uint8_t isInterested;
1504    uint32_t rate;
1505    tr_peer * peer;
1506};
1507
1508static int
1509compareChoke( const void * va, const void * vb )
1510{
1511    const struct ChokeData * a = va;
1512    const struct ChokeData * b = vb;
1513    return -tr_compareUint32( a->rate, b->rate );
1514}
1515
1516static int
1517isNew( const tr_peer * peer )
1518{
1519    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1520}
1521
1522static int
1523isSame( const tr_peer * peer )
1524{
1525    return peer && peer->client && strstr( peer->client, "Transmission" );
1526}
1527
1528/**
1529***
1530**/
1531
1532static int
1533getWeightedRate( const tr_peer * peer, int clientIsSeed )
1534{
1535    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1536                                        : peer->rateToClient ) );
1537}
1538
1539static void
1540rechoke( Torrent * t )
1541{
1542    int i, peerCount, size, unchokedInterested;
1543    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1544    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1545    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1546
1547    assert( torrentIsLocked( t ) );
1548   
1549    /* sort the peers by preference and rate */
1550    for( i=0, size=0; i<peerCount; ++i )
1551    {
1552        tr_peer * peer = peers[i];
1553        if( peer->progress >= 1.0 ) /* choke all seeds */
1554            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1555        else {
1556            struct ChokeData * node = &choke[size++];
1557            node->peer = peer;
1558            node->isInterested = peer->peerIsInterested;
1559            node->rate = getWeightedRate( peer, clientIsSeed );
1560        }
1561    }
1562
1563    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1564
1565    /**
1566     * Reciprocation and number of uploads capping is managed by unchoking
1567     * the N peers which have the best upload rate and are interested.
1568     * This maximizes the client's download rate. These N peers are
1569     * referred to as downloaders, because they are interested in downloading
1570     * from the client.
1571     *
1572     * Peers which have a better upload rate (as compared to the downloaders)
1573     * but aren't interested get unchoked. If they become interested, the
1574     * downloader with the worst upload rate gets choked. If a client has
1575     * a complete file, it uses its upload rate rather than its download
1576     * rate to decide which peers to unchoke.
1577     */
1578    unchokedInterested = 0;
1579    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1580        choke[i].doUnchoke = 1;
1581        if( choke[i].isInterested )
1582            ++unchokedInterested;
1583    }
1584
1585    /* optimistic unchoke */
1586    if( i < size )
1587    {
1588        int n;
1589        struct ChokeData * c;
1590        tr_ptrArray * randPool = tr_ptrArrayNew( );
1591
1592        for( ; i<size; ++i )
1593        {
1594            if( choke[i].isInterested )
1595            {
1596                const tr_peer * peer = choke[i].peer;
1597                int x=1, y;
1598                if( isNew( peer ) ) x *= 3;
1599                if( isSame( peer ) ) x *= 3;
1600                for( y=0; y<x; ++y )
1601                    tr_ptrArrayAppend( randPool, choke );
1602            }
1603        }
1604
1605        if(( n = tr_ptrArraySize( randPool )))
1606        {
1607            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1608            c->doUnchoke = 1;
1609            t->optimistic = c->peer;
1610        }
1611
1612        tr_ptrArrayFree( randPool, NULL );
1613    }
1614
1615    for( i=0; i<size; ++i )
1616        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1617
1618    /* cleanup */
1619    tr_free( choke );
1620    tr_free( peers );
1621}
1622
1623static int
1624rechokePulse( void * vtorrent )
1625{
1626    Torrent * t = vtorrent;
1627    torrentLock( t );
1628    rechoke( t );
1629    torrentUnlock( t );
1630    return TRUE;
1631}
1632
1633/***
1634****
1635****  Life and Death
1636****
1637***/
1638
1639static int
1640shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1641{
1642    const tr_torrent * tor = t->tor;
1643    const time_t now = time( NULL );
1644    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1645
1646    /* if it's marked for purging, close it */
1647    if( peer->doPurge ) {
1648        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1649        return TRUE;
1650    }
1651
1652    /* if we're seeding and the peer has everything we have,
1653     * and enough time has passed for a pex exchange, then disconnect */
1654    if( tr_torrentIsSeed( tor ) ) {
1655        int peerHasEverything;
1656        if( atom->flags & ADDED_F_SEED_FLAG )
1657            peerHasEverything = TRUE;
1658        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1659            peerHasEverything = FALSE;
1660        else {
1661            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1662            tr_bitfieldDifference( tmp, peer->have );
1663            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1664            tr_bitfieldFree( tmp );
1665        }
1666        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1667            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1668            return TRUE;
1669        }
1670    }
1671
1672    /* disconnect if it's been too long since piece data has been transferred.
1673     * this is on a sliding scale based on number of available peers... */
1674    {
1675        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1676        /* if we have >= relaxIfFewerThan, strictness is 100%.
1677         * if we have zero connections, strictness is 0% */
1678        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1679            ? 1.0
1680            : peerCount / (float)relaxStrictnessIfFewerThanN;
1681        const int lo = MIN_UPLOAD_IDLE_SECS;
1682        const int hi = MAX_UPLOAD_IDLE_SECS;
1683        const int limit = lo + ((hi-lo) * strictness);
1684        const time_t then = peer->pieceDataActivityDate;
1685        const int idleTime = then ? (now-then) : 0;
1686        if( idleTime > limit ) {
1687            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1688                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1689            return TRUE;
1690        }
1691    }
1692
1693    return FALSE;
1694}
1695
1696static tr_peer **
1697getPeersToClose( Torrent * t, int * setmeSize )
1698{
1699    int i, peerCount, outsize;
1700    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1701    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1702
1703    assert( torrentIsLocked( t ) );
1704
1705    for( i=outsize=0; i<peerCount; ++i )
1706        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1707            ret[outsize++] = peers[i];
1708
1709    *setmeSize = outsize;
1710    return ret;
1711}
1712
1713static int
1714compareCandidates( const void * va, const void * vb )
1715{
1716    const struct peer_atom * a = * (const struct peer_atom**) va;
1717    const struct peer_atom * b = * (const struct peer_atom**) vb;
1718    int i;
1719
1720    if( a->piece_data_time > b->piece_data_time ) return -1;
1721    if( a->piece_data_time < b->piece_data_time ) return  1;
1722
1723    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1724        return i;
1725
1726    if( a->time != b->time )
1727        return a->time < b->time ? -1 : 1;
1728
1729    return 0;
1730}
1731
1732static struct peer_atom **
1733getPeerCandidates( Torrent * t, int * setmeSize )
1734{
1735    int i, atomCount, retCount;
1736    struct peer_atom ** atoms;
1737    struct peer_atom ** ret;
1738    const time_t now = time( NULL );
1739    const int seed = tr_torrentIsSeed( t->tor );
1740
1741    assert( torrentIsLocked( t ) );
1742
1743    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1744    ret = tr_new( struct peer_atom*, atomCount );
1745    for( i=retCount=0; i<atomCount; ++i )
1746    {
1747        struct peer_atom * atom = atoms[i];
1748
1749        /* peer fed us too much bad data ... we only keep it around
1750         * now to weed it out in case someone sends it to us via pex */
1751        if( atom->myflags & MYFLAG_BANNED )
1752            continue;
1753
1754        /* peer was unconnectable before, so we're not going to keep trying.
1755         * this is needs a separate flag from `banned', since if they try
1756         * to connect to us later, we'll let them in */
1757        if( atom->myflags & MYFLAG_UNREACHABLE )
1758            continue;
1759
1760        /* we don't need two connections to the same peer... */
1761        if( peerIsInUse( t, &atom->addr ) )
1762            continue;
1763
1764        /* no need to connect if we're both seeds... */
1765        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1766            continue;
1767
1768        /* we're wasting our time trying to connect to this bozo. */
1769        if( atom->numFails > 3 )
1770            continue;
1771
1772        /* If we were connected to this peer recently and transferring
1773         * piece data, try to reconnect -- network troubles may have
1774         * disconnected us.  but if we weren't sharing piece data,
1775         * hold off on this peer to give another one a try instead */
1776        if( ( now - atom->piece_data_time ) > 30 )
1777        {
1778            int minWait = (60 * 10); /* ten minutes */
1779            int maxWait = (60 * 30); /* thirty minutes */
1780            int wait = atom->numFails * minWait;
1781            if( wait < minWait ) wait = minWait;
1782            if( wait > maxWait ) wait = maxWait;
1783            if( ( now - atom->time ) < wait ) {
1784                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1785                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1786                continue;
1787            }
1788        }
1789
1790        /* Don't connect to peers in our blocklist */
1791        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1792            continue;
1793
1794        ret[retCount++] = atom;
1795    }
1796
1797    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1798    *setmeSize = retCount;
1799    return ret;
1800}
1801
1802static int
1803reconnectPulse( void * vtorrent )
1804{
1805    Torrent * t = vtorrent;
1806    static time_t prevTime = 0;
1807    static int newConnectionsThisSecond = 0;
1808    time_t now;
1809
1810    torrentLock( t );
1811
1812    now = time( NULL );
1813    if( prevTime != now )
1814    {
1815        prevTime = now;
1816        newConnectionsThisSecond = 0;
1817    }
1818
1819    if( !t->isRunning )
1820    {
1821        removeAllPeers( t );
1822    }
1823    else
1824    {
1825        int i, nCandidates, nBad;
1826        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1827        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1828
1829        if( nBad || nCandidates )
1830            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1831                       "%d connection candidates, %d atoms, max per pulse is %d",
1832                       t->tor->info.name, nBad, nCandidates,
1833                       tr_ptrArraySize(t->pool),
1834                       (int)MAX_RECONNECTIONS_PER_PULSE );
1835
1836        /* disconnect some peers.
1837           if we got transferred piece data, then they might be good peers,
1838           so reset their `numFails' weight to zero.  otherwise we connected
1839           to them fruitlessly, so mark it as another fail */
1840        for( i=0; i<nBad; ++i ) {
1841            tr_peer * peer = connections[i];
1842            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1843            if( peer->pieceDataActivityDate )
1844                atom->numFails = 0;
1845            else
1846                ++atom->numFails;
1847            removePeer( t, peer );
1848        }
1849
1850        /* add some new ones */
1851        for( i=0;    i < nCandidates
1852                  && i < MAX_RECONNECTIONS_PER_PULSE
1853                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1854        {
1855            tr_peerMgr * mgr = t->manager;
1856            struct peer_atom * atom = candidates[i];
1857            tr_peerIo * io;
1858
1859            tordbg( t, "Starting an OUTGOING connection with %s",
1860                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1861
1862            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1863            if( io == NULL )
1864            {
1865                atom->myflags |= MYFLAG_UNREACHABLE;
1866            }
1867            else
1868            {
1869                tr_handshake * handshake = tr_handshakeNew( io,
1870                                                            mgr->handle->encryptionMode,
1871                                                            myHandshakeDoneCB,
1872                                                            mgr );
1873
1874                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1875
1876                ++newConnectionsThisSecond;
1877
1878                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1879            }
1880
1881            atom->time = time( NULL );
1882        }
1883
1884        /* cleanup */
1885        tr_free( connections );
1886        tr_free( candidates );
1887    }
1888
1889    torrentUnlock( t );
1890    return TRUE;
1891}
Note: See TracBrowser for help on using the repository browser.