source: trunk/libtransmission/peer-mgr.c @ 6008

Last change on this file since 6008 was 6008, checked in by charles, 14 years ago

#988: Optimistic unchoke can unchoke uninterested peers

  • Property svn:keywords set to Date Rev Author Id
File size: 53.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6008 2008-06-03 04:29:56Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ptrarray.h"
33#include "ratecontrol.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38enum
39{
40    /* how frequently to change which peers are choked */
41    RECHOKE_PERIOD_MSEC = (10 * 1000),
42
43    /* how frequently to refill peers' request lists */
44    REFILL_PERIOD_MSEC = 666,
45
46    /* following the BT spec, we consider ourselves `snubbed' if
47     * we're we don't get piece data from a peer in this long */
48    SNUBBED_SEC = 60,
49
50    /* when many peers are available, keep idle ones this long */
51    MIN_UPLOAD_IDLE_SECS = (60 * 3),
52
53    /* when few peers are available, keep idle ones this long */
54    MAX_UPLOAD_IDLE_SECS = (60 * 10),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = (2 * 1000),
58
59    /* max # of peers to ask fer per torrent per reconnect pulse */
60    MAX_RECONNECTIONS_PER_PULSE = 1,
61
62    /* max number of peers to ask for per second overall.
63     * this throttle is to avoid overloading the router */
64    MAX_CONNECTIONS_PER_SECOND = 8,
65
66    /* number of unchoked peers per torrent */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_timer * reconnectTimer;
107    tr_timer * rechokeTimer;
108    tr_timer * refillTimer;
109    tr_torrent * tor;
110    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
111    tr_bitfield * requested;
112
113    unsigned int isRunning : 1;
114
115    struct tr_peerMgr * manager;
116}
117Torrent;
118
119struct tr_peerMgr
120{
121    tr_handle * handle;
122    tr_ptrArray * torrents; /* Torrent */
123    tr_ptrArray * incomingHandshakes; /* tr_handshake */
124};
125
126#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
127
128/**
129***
130**/
131
132static void
133managerLock( const struct tr_peerMgr * manager )
134{
135    tr_globalLock( manager->handle );
136}
137static void
138managerUnlock( const struct tr_peerMgr * manager )
139{
140    tr_globalUnlock( manager->handle );
141}
142static void
143torrentLock( Torrent * torrent )
144{
145    managerLock( torrent->manager );
146}
147static void
148torrentUnlock( Torrent * torrent )
149{
150    managerUnlock( torrent->manager );
151}
152static int
153torrentIsLocked( const Torrent * t )
154{
155    return tr_globalIsLocked( t->manager->handle );
156}
157
158/**
159***
160**/
161
162static int
163compareAddresses( const struct in_addr * a, const struct in_addr * b )
164{
165    return tr_compareUint32( a->s_addr, b->s_addr );
166}
167
168static int
169handshakeCompareToAddr( const void * va, const void * vb )
170{
171    const tr_handshake * a = va;
172    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
173}
174
175static int
176handshakeCompare( const void * a, const void * b )
177{
178    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
179}
180
181static tr_handshake*
182getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
183{
184    return tr_ptrArrayFindSorted( handshakes,
185                                  in_addr,
186                                  handshakeCompareToAddr );
187}
188
189static int
190comparePeerAtomToAddress( const void * va, const void * vb )
191{
192    const struct peer_atom * a = va;
193    return compareAddresses( &a->addr, vb );
194}
195
196static int
197comparePeerAtoms( const void * va, const void * vb )
198{
199    const struct peer_atom * b = vb;
200    return comparePeerAtomToAddress( va, &b->addr );
201}
202
203/**
204***
205**/
206
207static int
208torrentCompare( const void * va, const void * vb )
209{
210    const Torrent * a = va;
211    const Torrent * b = vb;
212    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
213}
214
215static int
216torrentCompareToHash( const void * va, const void * vb )
217{
218    const Torrent * a = va;
219    const uint8_t * b_hash = vb;
220    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
221}
222
223static Torrent*
224getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
225{
226    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
227                                             hash,
228                                             torrentCompareToHash );
229}
230
231static int
232peerCompare( const void * va, const void * vb )
233{
234    const tr_peer * a = va;
235    const tr_peer * b = vb;
236    return compareAddresses( &a->in_addr, &b->in_addr );
237}
238
239static int
240peerCompareToAddr( const void * va, const void * vb )
241{
242    const tr_peer * a = va;
243    return compareAddresses( &a->in_addr, vb );
244}
245
246static tr_peer*
247getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
248{
249    assert( torrentIsLocked( torrent ) );
250    assert( in_addr != NULL );
251
252    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
253                                             in_addr,
254                                             peerCompareToAddr );
255}
256
257static struct peer_atom*
258getExistingAtom( const Torrent * t, const struct in_addr * addr )
259{
260    assert( torrentIsLocked( t ) );
261    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
262}
263
264static int
265peerIsInUse( const Torrent * ct, const struct in_addr * addr )
266{
267    Torrent * t = (Torrent*) ct;
268
269    assert( torrentIsLocked ( t ) );
270
271    return getExistingPeer( t, addr )
272        || getExistingHandshake( t->outgoingHandshakes, addr )
273        || getExistingHandshake( t->manager->incomingHandshakes, addr );
274}
275
276static tr_peer*
277peerConstructor( const struct in_addr * in_addr )
278{
279    tr_peer * p;
280    p = tr_new0( tr_peer, 1 );
281    p->rcToClient = tr_rcInit( );
282    p->rcToPeer = tr_rcInit( );
283    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
284    return p;
285}
286
287static tr_peer*
288getPeer( Torrent * torrent, const struct in_addr * in_addr )
289{
290    tr_peer * peer;
291
292    assert( torrentIsLocked( torrent ) );
293
294    peer = getExistingPeer( torrent, in_addr );
295
296    if( peer == NULL ) {
297        peer = peerConstructor( in_addr );
298        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
299    }
300
301    return peer;
302}
303
304static void
305peerDestructor( tr_peer * peer )
306{
307    assert( peer != NULL );
308    assert( peer->msgs != NULL );
309
310    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
311    tr_peerMsgsFree( peer->msgs );
312
313    tr_peerIoFree( peer->io );
314
315    tr_bitfieldFree( peer->have );
316    tr_bitfieldFree( peer->blame );
317    tr_rcClose( peer->rcToClient );
318    tr_rcClose( peer->rcToPeer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    peerDestructor( removed );
338}
339
340static void
341removeAllPeers( Torrent * t )
342{
343    while( !tr_ptrArrayEmpty( t->peers ) )
344        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
345}
346
347static void
348torrentDestructor( Torrent * t )
349{
350    uint8_t hash[SHA_DIGEST_LENGTH];
351
352    assert( t != NULL );
353    assert( !t->isRunning );
354    assert( t->peers != NULL );
355    assert( torrentIsLocked( t ) );
356    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
357    assert( tr_ptrArrayEmpty( t->peers ) );
358
359    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
360
361    tr_timerFree( &t->reconnectTimer );
362    tr_timerFree( &t->rechokeTimer );
363    tr_timerFree( &t->refillTimer );
364
365    tr_bitfieldFree( t->requested );
366    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
367    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
368    tr_ptrArrayFree( t->peers, NULL );
369
370    tr_free( t );
371}
372
373static Torrent*
374torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
375{
376    Torrent * t;
377
378    t = tr_new0( Torrent, 1 );
379    t->manager = manager;
380    t->tor = tor;
381    t->pool = tr_ptrArrayNew( );
382    t->peers = tr_ptrArrayNew( );
383    t->outgoingHandshakes = tr_ptrArrayNew( );
384    t->requested = tr_bitfieldNew( tor->blockCount );
385    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
386
387    return t;
388}
389
390/**
391 * For explanation, see http://www.bittorrent.org/fast_extensions.html
392 * Also see the "test-allowed-set" unit test
393 */
394struct tr_bitfield *
395tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
396                              const uint32_t         sz,        /* number of pieces in torrent */
397                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
398                              const struct in_addr * ip )       /* peer's address */
399{
400    uint8_t w[SHA_DIGEST_LENGTH + 4];
401    uint8_t x[SHA_DIGEST_LENGTH];
402    tr_bitfield_t * a;
403    uint32_t a_size;
404
405    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
406    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
407    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
408
409    a = tr_bitfieldNew( sz );
410    a_size = 0;
411   
412    while( a_size < k )
413    {
414        int i;
415        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
416        {
417            uint32_t j = i * 4;                                /* (5) */
418            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
419            uint32_t index = y % sz;                           /* (7) */
420            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
421                tr_bitfieldAdd( a, index );                    /* (9) */
422                ++a_size;
423            }
424        }
425        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
426    }
427   
428    return a;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->incomingHandshakes = tr_ptrArrayNew( );
438    return m;
439}
440
441void
442tr_peerMgrFree( tr_peerMgr * manager )
443{
444    managerLock( manager );
445
446    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
447     * the item from manager->handshakes, so this is a little roundabout... */
448    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
449        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
450    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
451
452    /* free the torrents. */
453    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
454
455    managerUnlock( manager );
456    tr_free( manager );
457}
458
459static tr_peer**
460getConnectedPeers( Torrent * t, int * setmeCount )
461{
462    int i, peerCount, connectionCount;
463    tr_peer **peers;
464    tr_peer **ret;
465
466    assert( torrentIsLocked( t ) );
467
468    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
469    ret = tr_new( tr_peer*, peerCount );
470
471    for( i=connectionCount=0; i<peerCount; ++i )
472        if( peers[i]->msgs != NULL )
473            ret[connectionCount++] = peers[i];
474
475    *setmeCount = connectionCount;
476    return ret;
477}
478
479static int
480clientIsDownloadingFrom( const tr_peer * peer )
481{
482    return peer->clientIsInterested && !peer->clientIsChoked;
483}
484
485static int
486clientIsUploadingTo( const tr_peer * peer )
487{
488    return peer->peerIsInterested && !peer->peerIsChoked;
489}
490
491/***
492****
493***/
494
495int
496tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
497                      const uint8_t          * torrentHash,
498                      const struct in_addr   * addr )
499{
500    int isSeed = FALSE;
501    const Torrent * t = NULL;
502    const struct peer_atom * atom = NULL;
503
504    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
505    if( t )
506        atom = getExistingAtom( t, addr );
507    if( atom )
508        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
509
510    return isSeed;
511}
512
513/***
514****  Refill
515***/
516
517struct tr_refill_piece
518{
519    tr_priority_t priority;
520    int missingBlockCount;
521    uint16_t random;
522    uint32_t piece;
523    uint32_t peerCount;
524};
525
526static int
527compareRefillPiece (const void * aIn, const void * bIn)
528{
529    const struct tr_refill_piece * a = aIn;
530    const struct tr_refill_piece * b = bIn;
531
532    /* fewer missing pieces goes first */
533    if( a->missingBlockCount != b->missingBlockCount )
534        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
535   
536    /* if one piece has a higher priority, it goes first */
537    if( a->priority != b->priority )
538        return a->priority > b->priority ? -1 : 1;
539   
540    /* otherwise if one has fewer peers, it goes first */
541    if (a->peerCount != b->peerCount)
542        return a->peerCount < b->peerCount ? -1 : 1;
543
544    /* otherwise go with our random seed */
545    return tr_compareUint16( a->random, b->random );
546}
547
548static int
549isPieceInteresting( const tr_torrent  * tor,
550                    tr_piece_index_t    piece )
551{
552    if( tor->info.pieces[piece].dnd ) /* we don't want it */
553        return 0;
554
555    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
556        return 0;
557
558    return 1;
559}
560
561static uint32_t*
562getPreferredPieces( Torrent     * t,
563                    uint32_t    * pieceCount )
564{
565    const tr_torrent * tor = t->tor;
566    const tr_info * inf = &tor->info;
567    tr_piece_index_t i;
568    uint32_t poolSize = 0;
569    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
570    int peerCount;
571    tr_peer** peers;
572
573    assert( torrentIsLocked( t ) );
574
575    peers = getConnectedPeers( t, &peerCount );
576
577    for( i=0; i<inf->pieceCount; ++i )
578        if( isPieceInteresting( tor, i ) )
579            pool[poolSize++] = i;
580
581    /* sort the pool from most interesting to least... */
582    if( poolSize > 1 )
583    {
584        uint32_t j;
585        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
586
587        for( j=0; j<poolSize; ++j )
588        {
589            int k;
590            const int piece = pool[j];
591            struct tr_refill_piece * setme = p + j;
592
593            setme->piece = piece;
594            setme->priority = inf->pieces[piece].priority;
595            setme->peerCount = 0;
596            setme->random = tr_rand( UINT16_MAX );
597            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
598
599            for( k=0; k<peerCount; ++k ) {
600                const tr_peer * peer = peers[k];
601                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
602                    ++setme->peerCount;
603            }
604        }
605
606        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
607
608        for( j=0; j<poolSize; ++j )
609            pool[j] = p[j].piece;
610
611        tr_free( p );
612    }
613
614    tr_free( peers );
615
616    *pieceCount = poolSize;
617    return pool;
618}
619
620static uint64_t*
621getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
622{
623    int s;
624    uint32_t i;
625    uint32_t pieceCount;
626    uint32_t blockCount;
627    uint32_t unreqCount[3], reqCount[3];
628    uint32_t * pieces;
629    uint64_t * ret, * walk;
630    uint64_t * unreq[3], *req[3];
631    const tr_torrent * tor = t->tor;
632
633    assert( torrentIsLocked( t ) );
634
635    pieces = getPreferredPieces( t, &pieceCount );
636
637    /**
638     * Now we walk through those preferred pieces to find all the blocks
639     * are still missing from them.  We put unrequested blocks first,
640     * of course, but by including requested blocks afterwards, endgame
641     * handling happens naturally.
642     *
643     * By doing this once per priority we also effectively get an endgame
644     * mode for each priority level.  The helps keep high priority files
645     * from getting stuck at 99% due of unresponsive peers.
646     */
647
648    /* make temporary bins for the four tiers of blocks */
649    for( i=0; i<3; ++i ) {
650        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
651        reqCount[i] = 0;
652        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
653        unreqCount[i] = 0;
654    }
655
656    /* sort the blocks into our temp bins */
657    for( i=blockCount=0; i<pieceCount; ++i )
658    {
659        const tr_piece_index_t index = pieces[i];
660        const int priorityIndex = tor->info.pieces[index].priority + 1;
661        const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
662        const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
663        tr_block_index_t block;
664
665        for( block=begin; block<end; ++block )
666        {
667            if( tr_cpBlockIsComplete( tor->completion, block ) )
668                continue;
669
670            ++blockCount;
671
672            if( tr_bitfieldHas( t->requested, block ) )
673            {
674                const uint32_t n = reqCount[priorityIndex]++;
675                req[priorityIndex][n] = block;
676            }
677            else
678            {
679                const uint32_t n = unreqCount[priorityIndex]++;
680                unreq[priorityIndex][n] = block;
681            }
682        }
683    }
684
685    /* join the bins together, going from highest priority to lowest so
686     * the the blocks we want to request first will be first in the list */
687    ret = walk = tr_new( uint64_t, blockCount );
688    for( s=2; s>=0; --s ) {
689        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
690        walk += unreqCount[s];
691        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
692        walk += reqCount[s];
693    }
694    assert( ( walk - ret ) == ( int )blockCount );
695    *setmeCount = blockCount;
696
697    /* cleanup */
698    tr_free( pieces );
699    for( i=0; i<3; ++i ) {
700        tr_free( unreq[i] );
701        tr_free( req[i] );
702    }
703    return ret;
704}
705
706static tr_peer**
707getPeersUploadingToClient( Torrent * t, int * setmeCount )
708{
709    int i;
710    int peerCount = 0;
711    int retCount = 0;
712    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
713    tr_peer ** ret = tr_new( tr_peer*, peerCount );
714
715    /* get a list of peers we're downloading from */
716    for( i=0; i<peerCount; ++i )
717        if( clientIsDownloadingFrom( peers[i] ) )
718            ret[retCount++] = peers[i];
719
720    /* pick a different starting point each time so all peers
721     * get a chance at the first blocks in the queue */
722    if( retCount ) {
723        tr_peer ** tmp = tr_new( tr_peer*, retCount );
724        i = tr_rand( retCount );
725        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
726        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
727        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
728        tr_free( tmp );
729    }
730
731    *setmeCount = retCount;
732    return ret;
733}
734
735static int
736refillPulse( void * vtorrent )
737{
738    Torrent * t = vtorrent;
739    tr_torrent * tor = t->tor;
740    tr_block_index_t i;
741    int peerCount;
742    tr_peer ** peers;
743    tr_block_index_t blockCount;
744    uint64_t * blocks;
745
746    if( !t->isRunning )
747        return TRUE;
748    if( tr_torrentIsSeed( t->tor ) )
749        return TRUE;
750
751    torrentLock( t );
752    tordbg( t, "Refilling Request Buffers..." );
753
754    blocks = getPreferredBlocks( t, &blockCount );
755    peers = getPeersUploadingToClient( t, &peerCount );
756
757    for( i=0; peerCount && i<blockCount; ++i )
758    {
759        int j;
760
761        const tr_block_index_t block = blocks[i];
762        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
763        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
764        const uint32_t length = tr_torBlockCountBytes( tor, block );
765
766        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
767        assert( _tr_block( tor, index, begin ) == block );
768        assert( begin < tr_torPieceCountBytes( tor, index ) );
769        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
770
771        /* find a peer who can ask for this block */
772        for( j=0; j<peerCount; )
773        {
774            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
775            switch( val )
776            {
777                case TR_ADDREQ_FULL: 
778                case TR_ADDREQ_CLIENT_CHOKED:
779                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
780                    break;
781
782                case TR_ADDREQ_MISSING: 
783                case TR_ADDREQ_DUPLICATE: 
784                    ++j;
785                    break;
786
787                case TR_ADDREQ_OK:
788                    tr_bitfieldAdd( t->requested, block );
789                    j = peerCount;
790                    break;
791
792                default:
793                    assert( 0 && "unhandled value" );
794                    break;
795            }
796        }
797    }
798
799    /* cleanup */
800    tr_free( peers );
801    tr_free( blocks );
802
803    t->refillTimer = NULL;
804    torrentUnlock( t );
805    return FALSE;
806}
807
808static void
809broadcastClientHave( Torrent * t, uint32_t index )
810{
811    int i, size;
812    tr_peer ** peers;
813
814    assert( torrentIsLocked( t ) );
815
816    peers = getConnectedPeers( t, &size );
817    for( i=0; i<size; ++i )
818        tr_peerMsgsHave( peers[i]->msgs, index );
819    tr_free( peers );
820}
821
822static void
823broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
824{
825    int i, size;
826    tr_peer ** peers;
827
828    assert( torrentIsLocked( t ) );
829
830    peers = getConnectedPeers( t, &size );
831    for( i=0; i<size; ++i )
832        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
833    tr_free( peers );
834}
835
836static void
837addStrike( Torrent * t, tr_peer * peer )
838{
839    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
840
841    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
842    {
843        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
844        atom->myflags |= MYFLAG_BANNED;
845        peer->doPurge = 1;
846        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
847    }
848}
849
850static void
851msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
852{
853    tr_peer * peer = vpeer;
854    Torrent * t = (Torrent *) vt;
855    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
856
857    torrentLock( t );
858
859    switch( e->eventType )
860    {
861        case TR_PEERMSG_NEED_REQ:
862            if( t->refillTimer == NULL )
863                t->refillTimer = tr_timerNew( t->manager->handle,
864                                              refillPulse, t,
865                                              REFILL_PERIOD_MSEC );
866            break;
867
868        case TR_PEERMSG_CANCEL:
869            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
870            break;
871
872        case TR_PEERMSG_PIECE_DATA: {
873            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
874            atom->piece_data_time = time( NULL );
875            break;
876        }
877
878        case TR_PEERMSG_CLIENT_HAVE:
879            broadcastClientHave( t, e->pieceIndex );
880            tr_torrentRecheckCompleteness( t->tor );
881            break;
882
883        case TR_PEERMSG_PEER_PROGRESS: {
884            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
885            const int peerIsSeed = e->progress >= 1.0;
886            if( peerIsSeed ) {
887                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
888                atom->flags |= ADDED_F_SEED_FLAG;
889            } else {
890                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
891                atom->flags &= ~ADDED_F_SEED_FLAG;
892            } break;
893        }
894
895        case TR_PEERMSG_CLIENT_BLOCK:
896            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
897            break;
898
899        case TR_PEERMSG_ERROR:
900            if( TR_ERROR_IS_IO( e->err ) ) {
901                t->tor->error = e->err;
902                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
903                tr_torrentStop( t->tor );
904            } else if( e->err == TR_ERROR_ASSERT ) {
905                addStrike( t, peer );
906            }
907            peer->doPurge = 1;
908            break;
909
910        default:
911            assert(0);
912    }
913
914    torrentUnlock( t );
915}
916
917static void
918ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
919{
920    if( getExistingAtom( t, addr ) == NULL )
921    {
922        struct peer_atom * a;
923        a = tr_new0( struct peer_atom, 1 );
924        a->addr = *addr;
925        a->port = port;
926        a->flags = flags;
927        a->from = from;
928        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
929        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
930    }
931}
932
933static int
934getMaxPeerCount( const tr_torrent * tor UNUSED )
935{
936    return tor->maxConnectedPeers;
937}
938
939/* FIXME: this is kind of a mess. */
940static void
941myHandshakeDoneCB( tr_handshake    * handshake,
942                   tr_peerIo       * io,
943                   int               isConnected,
944                   const uint8_t   * peer_id,
945                   void            * vmanager )
946{
947    int ok = isConnected;
948    uint16_t port;
949    const struct in_addr * addr;
950    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
951    Torrent * t;
952    tr_handshake * ours;
953
954    assert( io != NULL );
955    assert( isConnected==0 || isConnected==1 );
956
957    t = tr_peerIoHasTorrentHash( io )
958        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
959        : NULL;
960
961    if( tr_peerIoIsIncoming ( io ) )
962        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
963                                        handshake, handshakeCompare );
964    else if( t != NULL )
965        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
966                                        handshake, handshakeCompare );
967    else
968        ours = handshake;
969
970    assert( ours != NULL );
971    assert( ours == handshake );
972
973    if( t != NULL )
974        torrentLock( t );
975
976    addr = tr_peerIoGetAddress( io, &port );
977
978    if( !ok || !t || !t->isRunning )
979    {
980        if( t ) {
981            struct peer_atom * atom = getExistingAtom( t, addr );
982            if( atom )
983                ++atom->numFails;
984        }
985
986        tr_peerIoFree( io );
987    }
988    else /* looking good */
989    {
990        struct peer_atom * atom;
991        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
992        atom = getExistingAtom( t, addr );
993
994        if( atom->myflags & MYFLAG_BANNED )
995        {
996            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
997            tr_peerIoFree( io );
998        }
999        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1000        {
1001            tr_peerIoFree( io );
1002        }
1003        else
1004        {
1005            tr_peer * peer = getExistingPeer( t, addr );
1006
1007            if( peer != NULL ) /* we already have this peer */
1008            {
1009                tr_peerIoFree( io );
1010            }
1011            else
1012            {
1013                peer = getPeer( t, addr );
1014                tr_free( peer->client );
1015
1016                if( !peer_id )
1017                    peer->client = NULL;
1018                else {
1019                    char client[128];
1020                    tr_clientForId( client, sizeof( client ), peer_id );
1021                    peer->client = tr_strdup( client );
1022                }
1023                peer->port = port;
1024                peer->io = io;
1025                peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
1026                atom->time = time( NULL );
1027            }
1028        }
1029    }
1030
1031    if( t != NULL )
1032        torrentUnlock( t );
1033}
1034
1035void
1036tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1037                       struct in_addr  * addr,
1038                       uint16_t          port,
1039                       int               socket )
1040{
1041    managerLock( manager );
1042
1043    if( tr_blocklistHasAddress( manager->handle, addr ) )
1044    {
1045        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1046                inet_ntoa( *addr ) );
1047        tr_netClose( socket );
1048    }
1049    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1050    {
1051        tr_netClose( socket );
1052    }
1053    else /* we don't have a connetion to them yet... */
1054    {
1055        tr_peerIo * io;
1056        tr_handshake * handshake;
1057
1058        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1059
1060        handshake = tr_handshakeNew( io,
1061                                     manager->handle->encryptionMode,
1062                                     myHandshakeDoneCB,
1063                                     manager );
1064
1065        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1066    }
1067
1068    managerUnlock( manager );
1069}
1070
1071void
1072tr_peerMgrAddPex( tr_peerMgr     * manager,
1073                  const uint8_t  * torrentHash,
1074                  uint8_t          from,
1075                  const tr_pex   * pex )
1076{
1077    Torrent * t;
1078    managerLock( manager );
1079
1080    t = getExistingTorrent( manager, torrentHash );
1081    if( !tr_blocklistHasAddress( t->manager->handle, &pex->in_addr ) )
1082        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1083
1084    managerUnlock( manager );
1085}
1086
1087tr_pex *
1088tr_peerMgrCompactToPex( const void  * compact,
1089                        size_t        compactLen,
1090                        const char  * added_f,
1091                        size_t      * pexCount )
1092{
1093    size_t i;
1094    size_t n = compactLen / 6;
1095    const uint8_t * walk = compact;
1096    tr_pex * pex = tr_new0( tr_pex, n );
1097    for( i=0; i<n; ++i ) {
1098        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1099        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1100        if( added_f )
1101            pex[i].flags = added_f[i];
1102    }
1103    *pexCount = n;
1104    return pex;
1105}
1106
1107/**
1108***
1109**/
1110
1111void
1112tr_peerMgrSetBlame( tr_peerMgr     * manager,
1113                    const uint8_t  * torrentHash,
1114                    int              pieceIndex,
1115                    int              success )
1116{
1117    if( !success )
1118    {
1119        int peerCount, i;
1120        Torrent * t = getExistingTorrent( manager, torrentHash );
1121        tr_peer ** peers;
1122
1123        assert( torrentIsLocked( t ) );
1124
1125        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1126        for( i=0; i<peerCount; ++i )
1127        {
1128            tr_peer * peer = peers[i];
1129            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1130            {
1131                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1132                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1133                           pieceIndex, (int)peer->strikes+1 );
1134                addStrike( t, peer );
1135            }
1136        }
1137    }
1138}
1139
1140int
1141tr_pexCompare( const void * va, const void * vb )
1142{
1143    const tr_pex * a = (const tr_pex *) va;
1144    const tr_pex * b = (const tr_pex *) vb;
1145    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1146    if( i ) return i;
1147    if( a->port < b->port ) return -1;
1148    if( a->port > b->port ) return 1;
1149    return 0;
1150}
1151
1152int tr_pexCompare( const void * a, const void * b );
1153
1154static int
1155peerPrefersCrypto( const tr_peer * peer )
1156{
1157    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1158        return TRUE;
1159
1160    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1161        return FALSE;
1162
1163    return tr_peerIoIsEncrypted( peer->io );
1164};
1165
1166int
1167tr_peerMgrGetPeers( tr_peerMgr      * manager,
1168                    const uint8_t   * torrentHash,
1169                    tr_pex         ** setme_pex )
1170{
1171    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1172    int i, peerCount;
1173    const tr_peer ** peers;
1174    tr_pex * pex;
1175    tr_pex * walk;
1176
1177    managerLock( manager );
1178
1179    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1180    pex = walk = tr_new( tr_pex, peerCount );
1181
1182    for( i=0; i<peerCount; ++i, ++walk )
1183    {
1184        const tr_peer * peer = peers[i];
1185
1186        walk->in_addr = peer->in_addr;
1187
1188        walk->port = peer->port;
1189
1190        walk->flags = 0;
1191        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1192        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1193    }
1194
1195    assert( ( walk - pex ) == peerCount );
1196    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1197    *setme_pex = pex;
1198
1199    managerUnlock( manager );
1200
1201    return peerCount;
1202}
1203
1204static int reconnectPulse( void * vtorrent );
1205static int rechokePulse( void * vtorrent );
1206
1207void
1208tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1209                        const uint8_t  * torrentHash )
1210{
1211    Torrent * t;
1212
1213    managerLock( manager );
1214
1215    t = getExistingTorrent( manager, torrentHash );
1216
1217    assert( t != NULL );
1218    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1219    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1220
1221    if( !t->isRunning )
1222    {
1223        t->isRunning = 1;
1224
1225        t->reconnectTimer = tr_timerNew( t->manager->handle,
1226                                         reconnectPulse, t,
1227                                         RECONNECT_PERIOD_MSEC );
1228
1229        t->rechokeTimer = tr_timerNew( t->manager->handle,
1230                                       rechokePulse, t,
1231                                       RECHOKE_PERIOD_MSEC );
1232
1233        reconnectPulse( t );
1234
1235        rechokePulse( t );
1236    }
1237
1238    managerUnlock( manager );
1239}
1240
1241static void
1242stopTorrent( Torrent * t )
1243{
1244    assert( torrentIsLocked( t ) );
1245
1246    t->isRunning = 0;
1247    tr_timerFree( &t->rechokeTimer );
1248    tr_timerFree( &t->reconnectTimer );
1249
1250    /* disconnect the peers. */
1251    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1252    tr_ptrArrayClear( t->peers );
1253
1254    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1255     * which removes the handshake from t->outgoingHandshakes... */
1256    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1257        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1258}
1259void
1260tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1261                       const uint8_t  * torrentHash)
1262{
1263    managerLock( manager );
1264
1265    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1266
1267    managerUnlock( manager );
1268}
1269
1270void
1271tr_peerMgrAddTorrent( tr_peerMgr * manager,
1272                      tr_torrent * tor )
1273{
1274    Torrent * t;
1275
1276    managerLock( manager );
1277
1278    assert( tor != NULL );
1279    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1280
1281    t = torrentConstructor( manager, tor );
1282    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1283
1284    managerUnlock( manager );
1285}
1286
1287void
1288tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1289                         const uint8_t  * torrentHash )
1290{
1291    Torrent * t;
1292
1293    managerLock( manager );
1294
1295    t = getExistingTorrent( manager, torrentHash );
1296    assert( t != NULL );
1297    stopTorrent( t );
1298    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1299    torrentDestructor( t );
1300
1301    managerUnlock( manager );
1302}
1303
1304void
1305tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1306                               const uint8_t    * torrentHash,
1307                               int8_t           * tab,
1308                               int                tabCount )
1309{
1310    int i;
1311    const Torrent * t;
1312    const tr_torrent * tor;
1313    float interval;
1314
1315    managerLock( (tr_peerMgr*)manager );
1316
1317    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1318    tor = t->tor;
1319    interval = tor->info.pieceCount / (float)tabCount;
1320
1321    memset( tab, 0, tabCount );
1322
1323    for( i=0; i<tabCount; ++i )
1324    {
1325        const int piece = i * interval;
1326
1327        if( tor == NULL )
1328            tab[i] = 0;
1329        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1330            tab[i] = -1;
1331        else {
1332            int j, peerCount;
1333            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1334            for( j=0; j<peerCount; ++j )
1335                if( tr_bitfieldHas( peers[j]->have, i ) )
1336                    ++tab[i];
1337        }
1338    }
1339
1340    managerUnlock( (tr_peerMgr*)manager );
1341}
1342
1343/* Returns the pieces that are available from peers */
1344tr_bitfield*
1345tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1346                        const uint8_t    * torrentHash )
1347{
1348    int i, size;
1349    Torrent * t;
1350    tr_peer ** peers;
1351    tr_bitfield * pieces;
1352    managerLock( (tr_peerMgr*)manager );
1353
1354    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1355    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1356    peers = getConnectedPeers( t, &size );
1357    for( i=0; i<size; ++i )
1358        tr_bitfieldOr( pieces, peers[i]->have );
1359
1360    managerUnlock( (tr_peerMgr*)manager );
1361    tr_free( peers );
1362    return pieces;
1363}
1364
1365int
1366tr_peerMgrHasConnections( const tr_peerMgr * manager,
1367                          const uint8_t    * torrentHash )
1368{
1369    int ret;
1370    const Torrent * t;
1371    managerLock( (tr_peerMgr*)manager );
1372
1373    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1374    ret = t && tr_ptrArraySize( t->peers );
1375
1376    managerUnlock( (tr_peerMgr*)manager );
1377    return ret;
1378}
1379
1380void
1381tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1382                        const uint8_t    * torrentHash,
1383                        int              * setmePeersKnown,
1384                        int              * setmePeersConnected,
1385                        int              * setmeSeedsConnected,
1386                        int              * setmePeersSendingToUs,
1387                        int              * setmePeersGettingFromUs,
1388                        int              * setmePeersFrom )
1389{
1390    int i, size;
1391    const Torrent * t;
1392    const tr_peer ** peers;
1393
1394    managerLock( (tr_peerMgr*)manager );
1395
1396    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1397    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1398
1399    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1400    *setmePeersConnected      = 0;
1401    *setmeSeedsConnected      = 0;
1402    *setmePeersSendingToUs    = 0;
1403    *setmePeersGettingFromUs  = 0;
1404
1405    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1406        setmePeersFrom[i] = 0;
1407
1408    for( i=0; i<size; ++i )
1409    {
1410        const tr_peer * peer = peers[i];
1411        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1412
1413        if( peer->io == NULL ) /* not connected */
1414            continue;
1415
1416        ++*setmePeersConnected;
1417
1418        ++setmePeersFrom[atom->from];
1419
1420        if( clientIsDownloadingFrom( peer ) )
1421            ++*setmePeersSendingToUs;
1422
1423        if( clientIsUploadingTo( peer ) )
1424            ++*setmePeersGettingFromUs;
1425
1426        if( atom->flags & ADDED_F_SEED_FLAG )
1427            ++*setmeSeedsConnected;
1428    }
1429
1430    managerUnlock( (tr_peerMgr*)manager );
1431}
1432
1433struct tr_peer_stat *
1434tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1435                     const uint8_t     * torrentHash,
1436                     int               * setmeCount UNUSED )
1437{
1438    int i, size;
1439    const Torrent * t;
1440    tr_peer ** peers;
1441    tr_peer_stat * ret;
1442
1443    assert( manager != NULL );
1444    managerLock( (tr_peerMgr*)manager );
1445
1446    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1447    peers = getConnectedPeers( (Torrent*)t, &size );
1448    ret = tr_new0( tr_peer_stat, size );
1449
1450    for( i=0; i<size; ++i )
1451    {
1452        char * pch;
1453        const tr_peer * peer = peers[i];
1454        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1455        tr_peer_stat * stat = ret + i;
1456
1457        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1458        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1459        stat->port               = peer->port;
1460        stat->from               = atom->from;
1461        stat->progress           = peer->progress;
1462        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1463        stat->uploadToRate       = peer->rateToPeer;
1464        stat->downloadFromRate   = peer->rateToClient;
1465        stat->peerIsChoked       = peer->peerIsChoked;
1466        stat->peerIsInterested   = peer->peerIsInterested;
1467        stat->clientIsChoked     = peer->clientIsChoked;
1468        stat->clientIsInterested = peer->clientIsInterested;
1469        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1470        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1471        stat->isUploadingTo      = clientIsUploadingTo( peer );
1472
1473        pch = stat->flagStr;
1474        if( t->optimistic == peer ) *pch++ = 'O';
1475        if( stat->isDownloadingFrom ) *pch++ = 'D';
1476        else if( stat->clientIsInterested ) *pch++ = 'd';
1477        if( stat->isUploadingTo ) *pch++ = 'U';
1478        else if( stat->peerIsInterested ) *pch++ = 'u';
1479        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1480        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1481        if( stat->isEncrypted ) *pch++ = 'E';
1482        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1483        if( stat->isIncoming ) *pch++ = 'I';
1484        *pch = '\0';
1485    }
1486
1487    *setmeCount = size;
1488    tr_free( peers );
1489
1490    managerUnlock( (tr_peerMgr*)manager );
1491    return ret;
1492}
1493
1494/**
1495***
1496**/
1497
1498struct ChokeData
1499{
1500    uint8_t doUnchoke;
1501    uint8_t isInterested;
1502    uint32_t rate;
1503    tr_peer * peer;
1504};
1505
1506static int
1507compareChoke( const void * va, const void * vb )
1508{
1509    const struct ChokeData * a = va;
1510    const struct ChokeData * b = vb;
1511    return -tr_compareUint32( a->rate, b->rate );
1512}
1513
1514static int
1515isNew( const tr_peer * peer )
1516{
1517    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1518}
1519
1520static int
1521isSame( const tr_peer * peer )
1522{
1523    return peer && peer->client && strstr( peer->client, "Transmission" );
1524}
1525
1526/**
1527***
1528**/
1529
1530static int
1531getWeightedRate( const tr_peer * peer, int clientIsSeed )
1532{
1533    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1534                                        : peer->rateToClient ) );
1535}
1536
1537static void
1538rechoke( Torrent * t )
1539{
1540    int i, peerCount, size, unchokedInterested;
1541    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1542    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1543    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1544
1545    assert( torrentIsLocked( t ) );
1546   
1547    /* sort the peers by preference and rate */
1548    for( i=0, size=0; i<peerCount; ++i )
1549    {
1550        tr_peer * peer = peers[i];
1551        if( peer->progress >= 1.0 ) /* choke all seeds */
1552            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1553        else {
1554            struct ChokeData * node = &choke[size++];
1555            node->peer = peer;
1556            node->isInterested = peer->peerIsInterested;
1557            node->rate = getWeightedRate( peer, clientIsSeed );
1558        }
1559    }
1560
1561    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1562
1563    /**
1564     * Reciprocation and number of uploads capping is managed by unchoking
1565     * the N peers which have the best upload rate and are interested.
1566     * This maximizes the client's download rate. These N peers are
1567     * referred to as downloaders, because they are interested in downloading
1568     * from the client.
1569     *
1570     * Peers which have a better upload rate (as compared to the downloaders)
1571     * but aren't interested get unchoked. If they become interested, the
1572     * downloader with the worst upload rate gets choked. If a client has
1573     * a complete file, it uses its upload rate rather than its download
1574     * rate to decide which peers to unchoke.
1575     */
1576    unchokedInterested = 0;
1577    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1578        choke[i].doUnchoke = 1;
1579        if( choke[i].isInterested )
1580            ++unchokedInterested;
1581    }
1582
1583    /* optimistic unchoke */
1584    if( i < size )
1585    {
1586        int n;
1587        struct ChokeData * c;
1588        tr_ptrArray * randPool = tr_ptrArrayNew( );
1589
1590        for( ; i<size; ++i )
1591        {
1592            if( choke[i].isInterested )
1593            {
1594                const tr_peer * peer = choke[i].peer;
1595                int x=1, y;
1596                if( isNew( peer ) ) x *= 3;
1597                if( isSame( peer ) ) x *= 3;
1598                for( y=0; y<x; ++y )
1599                    tr_ptrArrayAppend( randPool, choke );
1600            }
1601        }
1602
1603        if(( n = tr_ptrArraySize( randPool )))
1604        {
1605            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1606            c->doUnchoke = 1;
1607            t->optimistic = c->peer;
1608        }
1609
1610        tr_ptrArrayFree( randPool, NULL );
1611    }
1612
1613    for( i=0; i<size; ++i )
1614        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1615
1616    /* cleanup */
1617    tr_free( choke );
1618    tr_free( peers );
1619}
1620
1621static int
1622rechokePulse( void * vtorrent )
1623{
1624    Torrent * t = vtorrent;
1625    torrentLock( t );
1626    rechoke( t );
1627    torrentUnlock( t );
1628    return TRUE;
1629}
1630
1631/***
1632****
1633****  Life and Death
1634****
1635***/
1636
1637static int
1638shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1639{
1640    const tr_torrent * tor = t->tor;
1641    const time_t now = time( NULL );
1642    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1643
1644    /* if it's marked for purging, close it */
1645    if( peer->doPurge ) {
1646        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1647        return TRUE;
1648    }
1649
1650    /* if we're seeding and the peer has everything we have,
1651     * and enough time has passed for a pex exchange, then disconnect */
1652    if( tr_torrentIsSeed( tor ) ) {
1653        int peerHasEverything;
1654        if( atom->flags & ADDED_F_SEED_FLAG )
1655            peerHasEverything = TRUE;
1656        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1657            peerHasEverything = FALSE;
1658        else {
1659            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1660            tr_bitfieldDifference( tmp, peer->have );
1661            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1662            tr_bitfieldFree( tmp );
1663        }
1664        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1665            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1666            return TRUE;
1667        }
1668    }
1669
1670    /* disconnect if it's been too long since piece data has been transferred.
1671     * this is on a sliding scale based on number of available peers... */
1672    {
1673        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1674        /* if we have >= relaxIfFewerThan, strictness is 100%.
1675         * if we have zero connections, strictness is 0% */
1676        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1677            ? 1.0
1678            : peerCount / (float)relaxStrictnessIfFewerThanN;
1679        const int lo = MIN_UPLOAD_IDLE_SECS;
1680        const int hi = MAX_UPLOAD_IDLE_SECS;
1681        const int limit = lo + ((hi-lo) * strictness);
1682        const time_t then = peer->pieceDataActivityDate;
1683        const int idleTime = then ? (now-then) : 0;
1684        if( idleTime > limit ) {
1685            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1686                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1687            return TRUE;
1688        }
1689    }
1690
1691    return FALSE;
1692}
1693
1694static tr_peer **
1695getPeersToClose( Torrent * t, int * setmeSize )
1696{
1697    int i, peerCount, outsize;
1698    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1699    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1700
1701    assert( torrentIsLocked( t ) );
1702
1703    for( i=outsize=0; i<peerCount; ++i )
1704        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1705            ret[outsize++] = peers[i];
1706
1707    *setmeSize = outsize;
1708    return ret;
1709}
1710
1711static int
1712compareCandidates( const void * va, const void * vb )
1713{
1714    const struct peer_atom * a = * (const struct peer_atom**) va;
1715    const struct peer_atom * b = * (const struct peer_atom**) vb;
1716    int i;
1717
1718    if( a->piece_data_time > b->piece_data_time ) return -1;
1719    if( a->piece_data_time < b->piece_data_time ) return  1;
1720
1721    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1722        return i;
1723
1724    if( a->time != b->time )
1725        return a->time < b->time ? -1 : 1;
1726
1727    return 0;
1728}
1729
1730static struct peer_atom **
1731getPeerCandidates( Torrent * t, int * setmeSize )
1732{
1733    int i, atomCount, retCount;
1734    struct peer_atom ** atoms;
1735    struct peer_atom ** ret;
1736    const time_t now = time( NULL );
1737    const int seed = tr_torrentIsSeed( t->tor );
1738
1739    assert( torrentIsLocked( t ) );
1740
1741    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1742    ret = tr_new( struct peer_atom*, atomCount );
1743    for( i=retCount=0; i<atomCount; ++i )
1744    {
1745        struct peer_atom * atom = atoms[i];
1746
1747        /* peer fed us too much bad data ... we only keep it around
1748         * now to weed it out in case someone sends it to us via pex */
1749        if( atom->myflags & MYFLAG_BANNED )
1750            continue;
1751
1752        /* peer was unconnectable before, so we're not going to keep trying.
1753         * this is needs a separate flag from `banned', since if they try
1754         * to connect to us later, we'll let them in */
1755        if( atom->myflags & MYFLAG_UNREACHABLE )
1756            continue;
1757
1758        /* we don't need two connections to the same peer... */
1759        if( peerIsInUse( t, &atom->addr ) )
1760            continue;
1761
1762        /* no need to connect if we're both seeds... */
1763        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1764            continue;
1765
1766        /* we're wasting our time trying to connect to this bozo. */
1767        if( atom->numFails > 3 )
1768            continue;
1769
1770        /* If we were connected to this peer recently and transferring
1771         * piece data, try to reconnect -- network troubles may have
1772         * disconnected us.  but if we weren't sharing piece data,
1773         * hold off on this peer to give another one a try instead */
1774        if( ( now - atom->piece_data_time ) > 30 )
1775        {
1776            int minWait = (60 * 10); /* ten minutes */
1777            int maxWait = (60 * 30); /* thirty minutes */
1778            int wait = atom->numFails * minWait;
1779            if( wait < minWait ) wait = minWait;
1780            if( wait > maxWait ) wait = maxWait;
1781            if( ( now - atom->time ) < wait ) {
1782                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1783                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1784                continue;
1785            }
1786        }
1787
1788        /* Don't connect to peers in our blocklist */
1789        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1790            continue;
1791
1792        ret[retCount++] = atom;
1793    }
1794
1795    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1796    *setmeSize = retCount;
1797    return ret;
1798}
1799
1800static int
1801reconnectPulse( void * vtorrent )
1802{
1803    Torrent * t = vtorrent;
1804    static time_t prevTime = 0;
1805    static int newConnectionsThisSecond = 0;
1806    time_t now;
1807
1808    torrentLock( t );
1809
1810    now = time( NULL );
1811    if( prevTime != now )
1812    {
1813        prevTime = now;
1814        newConnectionsThisSecond = 0;
1815    }
1816
1817    if( !t->isRunning )
1818    {
1819        removeAllPeers( t );
1820    }
1821    else
1822    {
1823        int i, nCandidates, nBad;
1824        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1825        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1826
1827        if( nBad || nCandidates )
1828            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1829                       "%d connection candidates, %d atoms, max per pulse is %d",
1830                       t->tor->info.name, nBad, nCandidates,
1831                       tr_ptrArraySize(t->pool),
1832                       (int)MAX_RECONNECTIONS_PER_PULSE );
1833
1834        /* disconnect some peers.
1835           if we got transferred piece data, then they might be good peers,
1836           so reset their `numFails' weight to zero.  otherwise we connected
1837           to them fruitlessly, so mark it as another fail */
1838        for( i=0; i<nBad; ++i ) {
1839            tr_peer * peer = connections[i];
1840            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1841            if( peer->pieceDataActivityDate )
1842                atom->numFails = 0;
1843            else
1844                ++atom->numFails;
1845            removePeer( t, peer );
1846        }
1847
1848        /* add some new ones */
1849        for( i=0;    i < nCandidates
1850                  && i < MAX_RECONNECTIONS_PER_PULSE
1851                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1852        {
1853            tr_peerMgr * mgr = t->manager;
1854            struct peer_atom * atom = candidates[i];
1855            tr_peerIo * io;
1856
1857            tordbg( t, "Starting an OUTGOING connection with %s",
1858                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1859
1860            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1861            if( io == NULL )
1862            {
1863                atom->myflags |= MYFLAG_UNREACHABLE;
1864            }
1865            else
1866            {
1867                tr_handshake * handshake = tr_handshakeNew( io,
1868                                                            mgr->handle->encryptionMode,
1869                                                            myHandshakeDoneCB,
1870                                                            mgr );
1871
1872                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1873
1874                ++newConnectionsThisSecond;
1875
1876                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1877            }
1878
1879            atom->time = time( NULL );
1880        }
1881
1882        /* cleanup */
1883        tr_free( connections );
1884        tr_free( candidates );
1885    }
1886
1887    torrentUnlock( t );
1888    return TRUE;
1889}
Note: See TracBrowser for help on using the repository browser.