source: trunk/libtransmission/peer-mgr.c @ 6073

Last change on this file since 6073 was 6073, checked in by charles, 13 years ago

#800 initial support for GetRight?-style fetching of data through http and ftp servers specified in the .torrent's "url-list" tag

  • Property svn:keywords set to Date Rev Author Id
File size: 56.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6073 2008-06-07 21:26:41Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* how frequently to refill peers' request lists */
47    REFILL_PERIOD_MSEC = 666,
48
49    /* following the BT spec, we consider ourselves `snubbed' if
50     * we're we don't get piece data from a peer in this long */
51    SNUBBED_SEC = 60,
52
53    /* when many peers are available, keep idle ones this long */
54    MIN_UPLOAD_IDLE_SECS = (60 * 3),
55
56    /* when few peers are available, keep idle ones this long */
57    MAX_UPLOAD_IDLE_SECS = (60 * 10),
58
59    /* how frequently to decide which peers live and die */
60    RECONNECT_PERIOD_MSEC = (2 * 1000),
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 1,
64
65    /* max number of peers to ask for per second overall.
66     * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2
80};
81
82
83/**
84***
85**/
86
87/* We keep one of these for every peer we know about, whether
88 * it's connected or not, so the struct must be small.
89 * When our current connections underperform, we dip back
90 * into this list for new ones. */
91struct peer_atom
92{   
93    uint8_t from;
94    uint8_t flags; /* these match the added_f flags */
95    uint8_t myflags; /* flags that aren't defined in added_f */
96    uint16_t port;
97    uint16_t numFails;
98    struct in_addr addr;
99    time_t time;
100    time_t piece_data_time;
101};
102
103typedef struct
104{
105    uint8_t hash[SHA_DIGEST_LENGTH];
106    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
107    tr_ptrArray * pool; /* struct peer_atom */
108    tr_ptrArray * peers; /* tr_peer */
109    tr_ptrArray * webseeds; /* tr_webseed */
110    tr_timer * reconnectTimer;
111    tr_timer * rechokeTimer;
112    tr_timer * refillTimer;
113    tr_torrent * tor;
114    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
115    tr_bitfield * requested;
116
117    unsigned int isRunning : 1;
118
119    struct tr_peerMgr * manager;
120}
121Torrent;
122
123struct tr_peerMgr
124{
125    tr_handle * handle;
126    tr_ptrArray * torrents; /* Torrent */
127    tr_ptrArray * incomingHandshakes; /* tr_handshake */
128};
129
130#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
131
132/**
133***
134**/
135
136static void
137managerLock( const struct tr_peerMgr * manager )
138{
139    tr_globalLock( manager->handle );
140}
141static void
142managerUnlock( const struct tr_peerMgr * manager )
143{
144    tr_globalUnlock( manager->handle );
145}
146static void
147torrentLock( Torrent * torrent )
148{
149    managerLock( torrent->manager );
150}
151static void
152torrentUnlock( Torrent * torrent )
153{
154    managerUnlock( torrent->manager );
155}
156static int
157torrentIsLocked( const Torrent * t )
158{
159    return tr_globalIsLocked( t->manager->handle );
160}
161
162/**
163***
164**/
165
166static int
167compareAddresses( const struct in_addr * a, const struct in_addr * b )
168{
169    return tr_compareUint32( a->s_addr, b->s_addr );
170}
171
172static int
173handshakeCompareToAddr( const void * va, const void * vb )
174{
175    const tr_handshake * a = va;
176    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
177}
178
179static int
180handshakeCompare( const void * a, const void * b )
181{
182    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
183}
184
185static tr_handshake*
186getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
187{
188    return tr_ptrArrayFindSorted( handshakes,
189                                  in_addr,
190                                  handshakeCompareToAddr );
191}
192
193static int
194comparePeerAtomToAddress( const void * va, const void * vb )
195{
196    const struct peer_atom * a = va;
197    return compareAddresses( &a->addr, vb );
198}
199
200static int
201comparePeerAtoms( const void * va, const void * vb )
202{
203    const struct peer_atom * b = vb;
204    return comparePeerAtomToAddress( va, &b->addr );
205}
206
207/**
208***
209**/
210
211static int
212torrentCompare( const void * va, const void * vb )
213{
214    const Torrent * a = va;
215    const Torrent * b = vb;
216    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
217}
218
219static int
220torrentCompareToHash( const void * va, const void * vb )
221{
222    const Torrent * a = va;
223    const uint8_t * b_hash = vb;
224    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
225}
226
227static Torrent*
228getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
229{
230    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
231                                             hash,
232                                             torrentCompareToHash );
233}
234
235static int
236peerCompare( const void * va, const void * vb )
237{
238    const tr_peer * a = va;
239    const tr_peer * b = vb;
240    return compareAddresses( &a->in_addr, &b->in_addr );
241}
242
243static int
244peerCompareToAddr( const void * va, const void * vb )
245{
246    const tr_peer * a = va;
247    return compareAddresses( &a->in_addr, vb );
248}
249
250static tr_peer*
251getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
252{
253    assert( torrentIsLocked( torrent ) );
254    assert( in_addr != NULL );
255
256    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
257                                             in_addr,
258                                             peerCompareToAddr );
259}
260
261static struct peer_atom*
262getExistingAtom( const Torrent * t, const struct in_addr * addr )
263{
264    assert( torrentIsLocked( t ) );
265    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
266}
267
268static int
269peerIsInUse( const Torrent * ct, const struct in_addr * addr )
270{
271    Torrent * t = (Torrent*) ct;
272
273    assert( torrentIsLocked ( t ) );
274
275    return getExistingPeer( t, addr )
276        || getExistingHandshake( t->outgoingHandshakes, addr )
277        || getExistingHandshake( t->manager->incomingHandshakes, addr );
278}
279
280static tr_peer*
281peerConstructor( const struct in_addr * in_addr )
282{
283    tr_peer * p;
284    p = tr_new0( tr_peer, 1 );
285    p->rcToClient = tr_rcInit( );
286    p->rcToPeer = tr_rcInit( );
287    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
288    return p;
289}
290
291static tr_peer*
292getPeer( Torrent * torrent, const struct in_addr * in_addr )
293{
294    tr_peer * peer;
295
296    assert( torrentIsLocked( torrent ) );
297
298    peer = getExistingPeer( torrent, in_addr );
299
300    if( peer == NULL ) {
301        peer = peerConstructor( in_addr );
302        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
303    }
304
305    return peer;
306}
307
308static void
309peerDestructor( tr_peer * peer )
310{
311    assert( peer != NULL );
312    assert( peer->msgs != NULL );
313
314    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
315    tr_peerMsgsFree( peer->msgs );
316
317    tr_peerIoFree( peer->io );
318
319    tr_bitfieldFree( peer->have );
320    tr_bitfieldFree( peer->blame );
321    tr_rcClose( peer->rcToClient );
322    tr_rcClose( peer->rcToPeer );
323    tr_free( peer->client );
324    tr_free( peer );
325}
326
327static void
328removePeer( Torrent * t, tr_peer * peer )
329{
330    tr_peer * removed;
331    struct peer_atom * atom;
332
333    assert( torrentIsLocked( t ) );
334
335    atom = getExistingAtom( t, &peer->in_addr );
336    assert( atom != NULL );
337    atom->time = time( NULL );
338
339    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
340    assert( removed == peer );
341    peerDestructor( removed );
342}
343
344static void
345removeAllPeers( Torrent * t )
346{
347    while( !tr_ptrArrayEmpty( t->peers ) )
348        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
349}
350
351static void
352torrentDestructor( Torrent * t )
353{
354    uint8_t hash[SHA_DIGEST_LENGTH];
355
356    assert( t != NULL );
357    assert( !t->isRunning );
358    assert( t->peers != NULL );
359    assert( torrentIsLocked( t ) );
360    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
361    assert( tr_ptrArrayEmpty( t->peers ) );
362
363    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
364
365    tr_timerFree( &t->reconnectTimer );
366    tr_timerFree( &t->rechokeTimer );
367    tr_timerFree( &t->refillTimer );
368
369    tr_bitfieldFree( t->requested );
370    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
371    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
372    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
373    tr_ptrArrayFree( t->peers, NULL );
374
375    tr_free( t );
376}
377
378static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
379
380static Torrent*
381torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
382{
383    int i;
384    Torrent * t;
385
386    t = tr_new0( Torrent, 1 );
387    t->manager = manager;
388    t->tor = tor;
389    t->pool = tr_ptrArrayNew( );
390    t->peers = tr_ptrArrayNew( );
391    t->webseeds = tr_ptrArrayNew( );
392    t->outgoingHandshakes = tr_ptrArrayNew( );
393    t->requested = tr_bitfieldNew( tor->blockCount );
394    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
395
396    for( i=0; i<tor->info.webseedCount; ++i ) {
397        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
398        tr_ptrArrayAppend( t->webseeds, w );
399    }
400
401    return t;
402}
403
404/**
405 * For explanation, see http://www.bittorrent.org/fast_extensions.html
406 * Also see the "test-allowed-set" unit test
407 */
408struct tr_bitfield *
409tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
410                              const uint32_t         sz,        /* number of pieces in torrent */
411                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
412                              const struct in_addr * ip )       /* peer's address */
413{
414    uint8_t w[SHA_DIGEST_LENGTH + 4];
415    uint8_t x[SHA_DIGEST_LENGTH];
416    tr_bitfield_t * a;
417    uint32_t a_size;
418
419    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
420    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
421    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
422
423    a = tr_bitfieldNew( sz );
424    a_size = 0;
425   
426    while( a_size < k )
427    {
428        int i;
429        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
430        {
431            uint32_t j = i * 4;                                /* (5) */
432            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
433            uint32_t index = y % sz;                           /* (7) */
434            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
435                tr_bitfieldAdd( a, index );                    /* (9) */
436                ++a_size;
437            }
438        }
439        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
440    }
441   
442    return a;
443}
444
445tr_peerMgr*
446tr_peerMgrNew( tr_handle * handle )
447{
448    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
449    m->handle = handle;
450    m->torrents = tr_ptrArrayNew( );
451    m->incomingHandshakes = tr_ptrArrayNew( );
452    return m;
453}
454
455void
456tr_peerMgrFree( tr_peerMgr * manager )
457{
458    managerLock( manager );
459
460    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
461     * the item from manager->handshakes, so this is a little roundabout... */
462    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
463        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
464    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
465
466    /* free the torrents. */
467    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
468
469    managerUnlock( manager );
470    tr_free( manager );
471}
472
473static tr_peer**
474getConnectedPeers( Torrent * t, int * setmeCount )
475{
476    int i, peerCount, connectionCount;
477    tr_peer **peers;
478    tr_peer **ret;
479
480    assert( torrentIsLocked( t ) );
481
482    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
483    ret = tr_new( tr_peer*, peerCount );
484
485    for( i=connectionCount=0; i<peerCount; ++i )
486        if( peers[i]->msgs != NULL )
487            ret[connectionCount++] = peers[i];
488
489    *setmeCount = connectionCount;
490    return ret;
491}
492
493static int
494clientIsDownloadingFrom( const tr_peer * peer )
495{
496    return peer->clientIsInterested && !peer->clientIsChoked;
497}
498
499static int
500clientIsUploadingTo( const tr_peer * peer )
501{
502    return peer->peerIsInterested && !peer->peerIsChoked;
503}
504
505/***
506****
507***/
508
509int
510tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
511                      const uint8_t          * torrentHash,
512                      const struct in_addr   * addr )
513{
514    int isSeed = FALSE;
515    const Torrent * t = NULL;
516    const struct peer_atom * atom = NULL;
517
518    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
519    if( t )
520        atom = getExistingAtom( t, addr );
521    if( atom )
522        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
523
524    return isSeed;
525}
526
527/***
528****  Refill
529***/
530
531struct tr_refill_piece
532{
533    tr_priority_t priority;
534    int missingBlockCount;
535    uint16_t random;
536    uint32_t piece;
537    uint32_t peerCount;
538};
539
540static int
541compareRefillPiece (const void * aIn, const void * bIn)
542{
543    const struct tr_refill_piece * a = aIn;
544    const struct tr_refill_piece * b = bIn;
545
546    /* fewer missing pieces goes first */
547    if( a->missingBlockCount != b->missingBlockCount )
548        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
549   
550    /* if one piece has a higher priority, it goes first */
551    if( a->priority != b->priority )
552        return a->priority > b->priority ? -1 : 1;
553   
554    /* otherwise if one has fewer peers, it goes first */
555    if (a->peerCount != b->peerCount)
556        return a->peerCount < b->peerCount ? -1 : 1;
557
558#if 0
559    /* otherwise download them in order */
560    return tr_compareUint16( a->piece, b->piece );
561#else
562    /* otherwise go with our random seed */
563    return tr_compareUint16( a->random, b->random );
564#endif
565}
566
567static int
568isPieceInteresting( const tr_torrent  * tor,
569                    tr_piece_index_t    piece )
570{
571    if( tor->info.pieces[piece].dnd ) /* we don't want it */
572        return 0;
573
574    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
575        return 0;
576
577    return 1;
578}
579
580static uint32_t*
581getPreferredPieces( Torrent     * t,
582                    uint32_t    * pieceCount )
583{
584    const tr_torrent * tor = t->tor;
585    const tr_info * inf = &tor->info;
586    tr_piece_index_t i;
587    uint32_t poolSize = 0;
588    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
589    int peerCount;
590    tr_peer** peers;
591
592    assert( torrentIsLocked( t ) );
593
594    peers = getConnectedPeers( t, &peerCount );
595
596    for( i=0; i<inf->pieceCount; ++i )
597        if( isPieceInteresting( tor, i ) )
598            pool[poolSize++] = i;
599
600    /* sort the pool from most interesting to least... */
601    if( poolSize > 1 )
602    {
603        uint32_t j;
604        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
605
606        for( j=0; j<poolSize; ++j )
607        {
608            int k;
609            const int piece = pool[j];
610            struct tr_refill_piece * setme = p + j;
611
612            setme->piece = piece;
613            setme->priority = inf->pieces[piece].priority;
614            setme->peerCount = 0;
615            setme->random = tr_rand( UINT16_MAX );
616            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
617
618            for( k=0; k<peerCount; ++k ) {
619                const tr_peer * peer = peers[k];
620                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
621                    ++setme->peerCount;
622            }
623        }
624
625        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
626
627        for( j=0; j<poolSize; ++j )
628            pool[j] = p[j].piece;
629
630        tr_free( p );
631    }
632
633    tr_free( peers );
634
635    *pieceCount = poolSize;
636    return pool;
637}
638
639static uint64_t*
640getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
641{
642    int s;
643    uint32_t i;
644    uint32_t pieceCount;
645    uint32_t blockCount;
646    uint32_t unreqCount[3], reqCount[3];
647    uint32_t * pieces;
648    uint64_t * ret, * walk;
649    uint64_t * unreq[3], *req[3];
650    const tr_torrent * tor = t->tor;
651
652    assert( torrentIsLocked( t ) );
653
654    pieces = getPreferredPieces( t, &pieceCount );
655
656    /**
657     * Now we walk through those preferred pieces to find all the blocks
658     * are still missing from them.  We put unrequested blocks first,
659     * of course, but by including requested blocks afterwards, endgame
660     * handling happens naturally.
661     *
662     * By doing this once per priority we also effectively get an endgame
663     * mode for each priority level.  The helps keep high priority files
664     * from getting stuck at 99% due of unresponsive peers.
665     */
666
667    /* make temporary bins for the four tiers of blocks */
668    for( i=0; i<3; ++i ) {
669        req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
670        reqCount[i] = 0;
671        unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
672        unreqCount[i] = 0;
673    }
674
675    /* sort the blocks into our temp bins */
676    for( i=blockCount=0; i<pieceCount; ++i )
677    {
678        const tr_piece_index_t index = pieces[i];
679        const int priorityIndex = tor->info.pieces[index].priority + 1;
680        const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
681        const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
682        tr_block_index_t block;
683
684        assert( tr_bitfieldTestFast( t->requested, end-1 ) );
685
686        for( block=begin; block<end; ++block )
687        {
688            if( tr_cpBlockIsComplete( tor->completion, block ) )
689                continue;
690
691            ++blockCount;
692
693            if( tr_bitfieldHasFast( t->requested, block ) )
694            {
695                const uint32_t n = reqCount[priorityIndex]++;
696                req[priorityIndex][n] = block;
697            }
698            else
699            {
700                const uint32_t n = unreqCount[priorityIndex]++;
701                unreq[priorityIndex][n] = block;
702            }
703        }
704    }
705
706    /* join the bins together, going from highest priority to lowest so
707     * the the blocks we want to request first will be first in the list */
708    ret = walk = tr_new( uint64_t, blockCount );
709    for( s=2; s>=0; --s ) {
710        memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
711        walk += unreqCount[s];
712        memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
713        walk += reqCount[s];
714    }
715    assert( ( walk - ret ) == ( int )blockCount );
716    *setmeCount = blockCount;
717
718    /* cleanup */
719    tr_free( pieces );
720    for( i=0; i<3; ++i ) {
721        tr_free( unreq[i] );
722        tr_free( req[i] );
723    }
724    return ret;
725}
726
727static tr_peer**
728getPeersUploadingToClient( Torrent * t, int * setmeCount )
729{
730    int i;
731    int peerCount = 0;
732    int retCount = 0;
733    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
734    tr_peer ** ret = tr_new( tr_peer*, peerCount );
735
736    /* get a list of peers we're downloading from */
737    for( i=0; i<peerCount; ++i )
738        if( clientIsDownloadingFrom( peers[i] ) )
739            ret[retCount++] = peers[i];
740
741    /* pick a different starting point each time so all peers
742     * get a chance at the first blocks in the queue */
743    if( retCount ) {
744        tr_peer ** tmp = tr_new( tr_peer*, retCount );
745        i = tr_rand( retCount );
746        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
747        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
748        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
749        tr_free( tmp );
750    }
751
752    *setmeCount = retCount;
753    return ret;
754}
755
756static int
757refillPulse( void * vtorrent )
758{
759    Torrent * t = vtorrent;
760    tr_torrent * tor = t->tor;
761    tr_block_index_t i;
762    int peerCount;
763    int webseedCount;
764    tr_peer ** peers;
765    tr_webseed ** webseeds;
766    tr_block_index_t blockCount;
767    uint64_t * blocks;
768
769    if( !t->isRunning )
770        return TRUE;
771    if( tr_torrentIsSeed( t->tor ) )
772        return TRUE;
773
774    torrentLock( t );
775    tordbg( t, "Refilling Request Buffers..." );
776
777    blocks = getPreferredBlocks( t, &blockCount );
778    peers = getPeersUploadingToClient( t, &peerCount );
779    webseedCount = tr_ptrArraySize( t->webseeds );
780    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
781
782
783#warning do not check in this line
784peerCount = 0;
785
786    for( i=0; (webseedCount || peerCount) && i<blockCount; ++i )
787    {
788        int j;
789        int handled = FALSE;
790
791        const tr_block_index_t block = blocks[i];
792        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
793        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
794        const uint32_t length = tr_torBlockCountBytes( tor, block );
795fprintf( stderr, "next block in the refill pulse is %d\n", (int)block );
796
797        assert( tr_torrentReqIsValid( tor, index, begin, length ) );
798        assert( _tr_block( tor, index, begin ) == block );
799        assert( begin < tr_torPieceCountBytes( tor, index ) );
800        assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
801
802        /* find a peer who can ask for this block */
803        for( j=0; !handled && j<peerCount; )
804        {
805            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
806            switch( val )
807            {
808                case TR_ADDREQ_FULL: 
809                case TR_ADDREQ_CLIENT_CHOKED:
810                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
811                    break;
812                case TR_ADDREQ_MISSING: 
813                case TR_ADDREQ_DUPLICATE: 
814                    ++j;
815                    break;
816                case TR_ADDREQ_OK:
817                    tr_bitfieldAdd( t->requested, block );
818                    handled = TRUE;
819                    break;
820                default:
821                    assert( 0 && "unhandled value" );
822                    break;
823            }
824        }
825
826        /* maybe one of the webseeds can do it */
827        for( j=0; !handled && j<webseedCount; )
828        {
829            const int val = tr_webseedAddRequest( webseeds[j], index, begin, length );
830            switch( val )
831            {
832                case TR_ADDREQ_FULL: 
833                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
834                    break;
835                case TR_ADDREQ_OK:
836                    tr_bitfieldAdd( t->requested, block );
837                    handled = TRUE;
838                    break;
839                default:
840                    assert( 0 && "unhandled value" );
841                    break;
842            }
843        }
844    }
845
846    /* cleanup */
847    tr_free( webseeds );
848    tr_free( peers );
849    tr_free( blocks );
850
851    t->refillTimer = NULL;
852    torrentUnlock( t );
853    return FALSE;
854}
855
856static void
857broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
858{
859    int i, size;
860    tr_peer ** peers;
861
862    assert( torrentIsLocked( t ) );
863
864    peers = getConnectedPeers( t, &size );
865    for( i=0; i<size; ++i )
866        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
867    tr_free( peers );
868}
869
870static void
871addStrike( Torrent * t, tr_peer * peer )
872{
873    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
874
875    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
876    {
877        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
878        atom->myflags |= MYFLAG_BANNED;
879        peer->doPurge = 1;
880        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
881    }
882}
883
884static void
885gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
886{
887    tr_torrent * tor = t->tor;
888    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
889    tor->corruptCur += byteCount;
890    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
891}
892
893static void
894peerCallbackFunc( void * vpeer, void * vevent, void * vt )
895{
896    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
897    Torrent * t = (Torrent *) vt;
898    const tr_peer_event * e = vevent;
899
900    torrentLock( t );
901
902    switch( e->eventType )
903    {
904        case TR_PEER_NEED_REQ:
905            if( t->refillTimer == NULL )
906                t->refillTimer = tr_timerNew( t->manager->handle,
907                                              refillPulse, t,
908                                              REFILL_PERIOD_MSEC );
909            break;
910
911        case TR_PEER_CANCEL:
912            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
913            break;
914
915        case TR_PEER_PEER_GOT_DATA: {
916            const time_t now = time( NULL );
917            tr_torrent * tor = t->tor;
918            tor->activityDate = now;
919            tor->uploadedCur += e->length;
920            tr_rcTransferred( tor->upload, e->length );
921            tr_rcTransferred( tor->handle->upload, e->length );
922            tr_statsAddUploaded( tor->handle, e->length );
923            if( peer ) {
924                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
925                atom->piece_data_time = time( NULL );
926            }
927            break;
928        }
929
930        case TR_PEER_CLIENT_GOT_DATA: {
931            const time_t now = time( NULL );
932            tr_torrent * tor = t->tor;
933            tor->activityDate = now;
934            tor->downloadedCur += e->length;
935            tr_rcTransferred( tor->download, e->length );
936            tr_rcTransferred( tor->handle->download, e->length );
937            tr_statsAddDownloaded( tor->handle, e->length );
938            if( peer ) {
939                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
940                atom->piece_data_time = time( NULL );
941            }
942            break;
943        }
944
945        case TR_PEER_PEER_PROGRESS: {
946            if( peer ) {
947                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
948                const int peerIsSeed = e->progress >= 1.0;
949                if( peerIsSeed ) {
950                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
951                    atom->flags |= ADDED_F_SEED_FLAG;
952                } else {
953                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
954                    atom->flags &= ~ADDED_F_SEED_FLAG;
955                }
956            }
957            break;
958        }
959
960        case TR_PEER_CLIENT_GOT_BLOCK:
961        {
962            tr_torrent * tor = t->tor;
963
964            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
965
966            tr_cpBlockAdd( tor->completion, block );
967
968            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
969
970            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
971            {
972                const tr_piece_index_t p = e->pieceIndex;
973                const tr_errno err = tr_ioTestPiece( tor, p );
974
975                if( err ) {
976                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
977                               (unsigned long)p, tr_errorString( err ) );
978                }
979
980                tr_torrentSetHasPiece( tor, p, !err );
981                tr_torrentSetPieceChecked( tor, p, TRUE );
982                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
983
984                if( err )
985                    gotBadPiece( t, p );
986
987                tr_torrentRecheckCompleteness( tor );
988            }
989            break;
990        }
991
992        case TR_PEER_ERROR:
993            if( TR_ERROR_IS_IO( e->err ) ) {
994                t->tor->error = e->err;
995                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
996                tr_torrentStop( t->tor );
997            } else if( e->err == TR_ERROR_ASSERT ) {
998                addStrike( t, peer );
999            }
1000            peer->doPurge = 1;
1001            break;
1002
1003        default:
1004            assert(0);
1005    }
1006
1007    torrentUnlock( t );
1008}
1009
1010static void
1011ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
1012{
1013    if( getExistingAtom( t, addr ) == NULL )
1014    {
1015        struct peer_atom * a;
1016        a = tr_new0( struct peer_atom, 1 );
1017        a->addr = *addr;
1018        a->port = port;
1019        a->flags = flags;
1020        a->from = from;
1021        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
1022        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
1023    }
1024}
1025
1026static int
1027getMaxPeerCount( const tr_torrent * tor UNUSED )
1028{
1029    return tor->maxConnectedPeers;
1030}
1031
1032/* FIXME: this is kind of a mess. */
1033static void
1034myHandshakeDoneCB( tr_handshake    * handshake,
1035                   tr_peerIo       * io,
1036                   int               isConnected,
1037                   const uint8_t   * peer_id,
1038                   void            * vmanager )
1039{
1040    int ok = isConnected;
1041    uint16_t port;
1042    const struct in_addr * addr;
1043    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
1044    Torrent * t;
1045    tr_handshake * ours;
1046
1047    assert( io != NULL );
1048    assert( isConnected==0 || isConnected==1 );
1049
1050    t = tr_peerIoHasTorrentHash( io )
1051        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1052        : NULL;
1053
1054    if( tr_peerIoIsIncoming ( io ) )
1055        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
1056                                        handshake, handshakeCompare );
1057    else if( t != NULL )
1058        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
1059                                        handshake, handshakeCompare );
1060    else
1061        ours = handshake;
1062
1063    assert( ours != NULL );
1064    assert( ours == handshake );
1065
1066    if( t != NULL )
1067        torrentLock( t );
1068
1069    addr = tr_peerIoGetAddress( io, &port );
1070
1071    if( !ok || !t || !t->isRunning )
1072    {
1073        if( t ) {
1074            struct peer_atom * atom = getExistingAtom( t, addr );
1075            if( atom )
1076                ++atom->numFails;
1077        }
1078
1079        tr_peerIoFree( io );
1080    }
1081    else /* looking good */
1082    {
1083        struct peer_atom * atom;
1084        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1085        atom = getExistingAtom( t, addr );
1086
1087        if( atom->myflags & MYFLAG_BANNED )
1088        {
1089            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1090            tr_peerIoFree( io );
1091        }
1092        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1093        {
1094            tr_peerIoFree( io );
1095        }
1096        else
1097        {
1098            tr_peer * peer = getExistingPeer( t, addr );
1099
1100            if( peer != NULL ) /* we already have this peer */
1101            {
1102                tr_peerIoFree( io );
1103            }
1104            else
1105            {
1106                peer = getPeer( t, addr );
1107                tr_free( peer->client );
1108
1109                if( !peer_id )
1110                    peer->client = NULL;
1111                else {
1112                    char client[128];
1113                    tr_clientForId( client, sizeof( client ), peer_id );
1114                    peer->client = tr_strdup( client );
1115                }
1116                peer->port = port;
1117                peer->io = io;
1118                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1119                atom->time = time( NULL );
1120            }
1121        }
1122    }
1123
1124    if( t != NULL )
1125        torrentUnlock( t );
1126}
1127
1128void
1129tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1130                       struct in_addr  * addr,
1131                       uint16_t          port,
1132                       int               socket )
1133{
1134    managerLock( manager );
1135
1136    if( tr_blocklistHasAddress( manager->handle, addr ) )
1137    {
1138        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1139                inet_ntoa( *addr ) );
1140        tr_netClose( socket );
1141    }
1142    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1143    {
1144        tr_netClose( socket );
1145    }
1146    else /* we don't have a connetion to them yet... */
1147    {
1148        tr_peerIo * io;
1149        tr_handshake * handshake;
1150
1151        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1152
1153        handshake = tr_handshakeNew( io,
1154                                     manager->handle->encryptionMode,
1155                                     myHandshakeDoneCB,
1156                                     manager );
1157
1158        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1159    }
1160
1161    managerUnlock( manager );
1162}
1163
1164void
1165tr_peerMgrAddPex( tr_peerMgr     * manager,
1166                  const uint8_t  * torrentHash,
1167                  uint8_t          from,
1168                  const tr_pex   * pex )
1169{
1170    Torrent * t;
1171    managerLock( manager );
1172
1173    t = getExistingTorrent( manager, torrentHash );
1174    if( !tr_blocklistHasAddress( t->manager->handle, &pex->in_addr ) )
1175        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1176
1177    managerUnlock( manager );
1178}
1179
1180tr_pex *
1181tr_peerMgrCompactToPex( const void  * compact,
1182                        size_t        compactLen,
1183                        const char  * added_f,
1184                        size_t      * pexCount )
1185{
1186    size_t i;
1187    size_t n = compactLen / 6;
1188    const uint8_t * walk = compact;
1189    tr_pex * pex = tr_new0( tr_pex, n );
1190    for( i=0; i<n; ++i ) {
1191        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1192        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1193        if( added_f )
1194            pex[i].flags = added_f[i];
1195    }
1196    *pexCount = n;
1197    return pex;
1198}
1199
1200/**
1201***
1202**/
1203
1204void
1205tr_peerMgrSetBlame( tr_peerMgr     * manager,
1206                    const uint8_t  * torrentHash,
1207                    int              pieceIndex,
1208                    int              success )
1209{
1210    if( !success )
1211    {
1212        int peerCount, i;
1213        Torrent * t = getExistingTorrent( manager, torrentHash );
1214        tr_peer ** peers;
1215
1216        assert( torrentIsLocked( t ) );
1217
1218        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1219        for( i=0; i<peerCount; ++i )
1220        {
1221            tr_peer * peer = peers[i];
1222            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1223            {
1224                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1225                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1226                           pieceIndex, (int)peer->strikes+1 );
1227                addStrike( t, peer );
1228            }
1229        }
1230    }
1231}
1232
1233int
1234tr_pexCompare( const void * va, const void * vb )
1235{
1236    const tr_pex * a = (const tr_pex *) va;
1237    const tr_pex * b = (const tr_pex *) vb;
1238    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1239    if( i ) return i;
1240    if( a->port < b->port ) return -1;
1241    if( a->port > b->port ) return 1;
1242    return 0;
1243}
1244
1245int tr_pexCompare( const void * a, const void * b );
1246
1247static int
1248peerPrefersCrypto( const tr_peer * peer )
1249{
1250    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1251        return TRUE;
1252
1253    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1254        return FALSE;
1255
1256    return tr_peerIoIsEncrypted( peer->io );
1257};
1258
1259int
1260tr_peerMgrGetPeers( tr_peerMgr      * manager,
1261                    const uint8_t   * torrentHash,
1262                    tr_pex         ** setme_pex )
1263{
1264    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1265    int i, peerCount;
1266    const tr_peer ** peers;
1267    tr_pex * pex;
1268    tr_pex * walk;
1269
1270    managerLock( manager );
1271
1272    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1273    pex = walk = tr_new( tr_pex, peerCount );
1274
1275    for( i=0; i<peerCount; ++i, ++walk )
1276    {
1277        const tr_peer * peer = peers[i];
1278
1279        walk->in_addr = peer->in_addr;
1280
1281        walk->port = peer->port;
1282
1283        walk->flags = 0;
1284        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1285        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1286    }
1287
1288    assert( ( walk - pex ) == peerCount );
1289    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1290    *setme_pex = pex;
1291
1292    managerUnlock( manager );
1293
1294    return peerCount;
1295}
1296
1297static int reconnectPulse( void * vtorrent );
1298static int rechokePulse( void * vtorrent );
1299
1300void
1301tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1302                        const uint8_t  * torrentHash )
1303{
1304    Torrent * t;
1305
1306    managerLock( manager );
1307
1308    t = getExistingTorrent( manager, torrentHash );
1309
1310    assert( t != NULL );
1311    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1312    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1313
1314    if( !t->isRunning )
1315    {
1316        t->isRunning = 1;
1317
1318        t->reconnectTimer = tr_timerNew( t->manager->handle,
1319                                         reconnectPulse, t,
1320                                         RECONNECT_PERIOD_MSEC );
1321
1322        t->rechokeTimer = tr_timerNew( t->manager->handle,
1323                                       rechokePulse, t,
1324                                       RECHOKE_PERIOD_MSEC );
1325
1326        reconnectPulse( t );
1327
1328        rechokePulse( t );
1329    }
1330
1331    managerUnlock( manager );
1332}
1333
1334static void
1335stopTorrent( Torrent * t )
1336{
1337    assert( torrentIsLocked( t ) );
1338
1339    t->isRunning = 0;
1340    tr_timerFree( &t->rechokeTimer );
1341    tr_timerFree( &t->reconnectTimer );
1342
1343    /* disconnect the peers. */
1344    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1345    tr_ptrArrayClear( t->peers );
1346
1347    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1348     * which removes the handshake from t->outgoingHandshakes... */
1349    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1350        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1351}
1352void
1353tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1354                       const uint8_t  * torrentHash)
1355{
1356    managerLock( manager );
1357
1358    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1359
1360    managerUnlock( manager );
1361}
1362
1363void
1364tr_peerMgrAddTorrent( tr_peerMgr * manager,
1365                      tr_torrent * tor )
1366{
1367    Torrent * t;
1368
1369    managerLock( manager );
1370
1371    assert( tor != NULL );
1372    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1373
1374    t = torrentConstructor( manager, tor );
1375    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1376
1377    managerUnlock( manager );
1378}
1379
1380void
1381tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1382                         const uint8_t  * torrentHash )
1383{
1384    Torrent * t;
1385
1386    managerLock( manager );
1387
1388    t = getExistingTorrent( manager, torrentHash );
1389    assert( t != NULL );
1390    stopTorrent( t );
1391    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1392    torrentDestructor( t );
1393
1394    managerUnlock( manager );
1395}
1396
1397void
1398tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1399                               const uint8_t    * torrentHash,
1400                               int8_t           * tab,
1401                               int                tabCount )
1402{
1403    int i;
1404    const Torrent * t;
1405    const tr_torrent * tor;
1406    float interval;
1407
1408    managerLock( (tr_peerMgr*)manager );
1409
1410    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1411    tor = t->tor;
1412    interval = tor->info.pieceCount / (float)tabCount;
1413
1414    memset( tab, 0, tabCount );
1415
1416    for( i=0; i<tabCount; ++i )
1417    {
1418        const int piece = i * interval;
1419
1420        if( tor == NULL )
1421            tab[i] = 0;
1422        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1423            tab[i] = -1;
1424        else {
1425            int j, peerCount;
1426            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1427            for( j=0; j<peerCount; ++j )
1428                if( tr_bitfieldHas( peers[j]->have, i ) )
1429                    ++tab[i];
1430        }
1431    }
1432
1433    managerUnlock( (tr_peerMgr*)manager );
1434}
1435
1436/* Returns the pieces that are available from peers */
1437tr_bitfield*
1438tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1439                        const uint8_t    * torrentHash )
1440{
1441    int i, size;
1442    Torrent * t;
1443    tr_peer ** peers;
1444    tr_bitfield * pieces;
1445    managerLock( (tr_peerMgr*)manager );
1446
1447    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1448    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1449    peers = getConnectedPeers( t, &size );
1450    for( i=0; i<size; ++i )
1451        tr_bitfieldOr( pieces, peers[i]->have );
1452
1453    managerUnlock( (tr_peerMgr*)manager );
1454    tr_free( peers );
1455    return pieces;
1456}
1457
1458int
1459tr_peerMgrHasConnections( const tr_peerMgr * manager,
1460                          const uint8_t    * torrentHash )
1461{
1462    int ret;
1463    const Torrent * t;
1464    managerLock( (tr_peerMgr*)manager );
1465
1466    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1467    ret = t && tr_ptrArraySize( t->peers );
1468
1469    managerUnlock( (tr_peerMgr*)manager );
1470    return ret;
1471}
1472
1473void
1474tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1475                        const uint8_t    * torrentHash,
1476                        int              * setmePeersKnown,
1477                        int              * setmePeersConnected,
1478                        int              * setmeSeedsConnected,
1479                        int              * setmePeersSendingToUs,
1480                        int              * setmePeersGettingFromUs,
1481                        int              * setmePeersFrom )
1482{
1483    int i, size;
1484    const Torrent * t;
1485    const tr_peer ** peers;
1486
1487    managerLock( (tr_peerMgr*)manager );
1488
1489    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1490    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1491
1492    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1493    *setmePeersConnected      = 0;
1494    *setmeSeedsConnected      = 0;
1495    *setmePeersSendingToUs    = 0;
1496    *setmePeersGettingFromUs  = 0;
1497
1498    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1499        setmePeersFrom[i] = 0;
1500
1501    for( i=0; i<size; ++i )
1502    {
1503        const tr_peer * peer = peers[i];
1504        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1505
1506        if( peer->io == NULL ) /* not connected */
1507            continue;
1508
1509        ++*setmePeersConnected;
1510
1511        ++setmePeersFrom[atom->from];
1512
1513        if( clientIsDownloadingFrom( peer ) )
1514            ++*setmePeersSendingToUs;
1515
1516        if( clientIsUploadingTo( peer ) )
1517            ++*setmePeersGettingFromUs;
1518
1519        if( atom->flags & ADDED_F_SEED_FLAG )
1520            ++*setmeSeedsConnected;
1521    }
1522
1523    managerUnlock( (tr_peerMgr*)manager );
1524}
1525
1526struct tr_peer_stat *
1527tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1528                     const uint8_t     * torrentHash,
1529                     int               * setmeCount UNUSED )
1530{
1531    int i, size;
1532    const Torrent * t;
1533    tr_peer ** peers;
1534    tr_peer_stat * ret;
1535
1536    assert( manager != NULL );
1537    managerLock( (tr_peerMgr*)manager );
1538
1539    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1540    peers = getConnectedPeers( (Torrent*)t, &size );
1541    ret = tr_new0( tr_peer_stat, size );
1542
1543    for( i=0; i<size; ++i )
1544    {
1545        char * pch;
1546        const tr_peer * peer = peers[i];
1547        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1548        tr_peer_stat * stat = ret + i;
1549
1550        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1551        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1552        stat->port               = peer->port;
1553        stat->from               = atom->from;
1554        stat->progress           = peer->progress;
1555        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1556        stat->uploadToRate       = peer->rateToPeer;
1557        stat->downloadFromRate   = peer->rateToClient;
1558        stat->peerIsChoked       = peer->peerIsChoked;
1559        stat->peerIsInterested   = peer->peerIsInterested;
1560        stat->clientIsChoked     = peer->clientIsChoked;
1561        stat->clientIsInterested = peer->clientIsInterested;
1562        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1563        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1564        stat->isUploadingTo      = clientIsUploadingTo( peer );
1565
1566        pch = stat->flagStr;
1567        if( t->optimistic == peer ) *pch++ = 'O';
1568        if( stat->isDownloadingFrom ) *pch++ = 'D';
1569        else if( stat->clientIsInterested ) *pch++ = 'd';
1570        if( stat->isUploadingTo ) *pch++ = 'U';
1571        else if( stat->peerIsInterested ) *pch++ = 'u';
1572        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1573        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1574        if( stat->isEncrypted ) *pch++ = 'E';
1575        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1576        if( stat->isIncoming ) *pch++ = 'I';
1577        *pch = '\0';
1578    }
1579
1580    *setmeCount = size;
1581    tr_free( peers );
1582
1583    managerUnlock( (tr_peerMgr*)manager );
1584    return ret;
1585}
1586
1587/**
1588***
1589**/
1590
1591struct ChokeData
1592{
1593    uint8_t doUnchoke;
1594    uint8_t isInterested;
1595    uint32_t rate;
1596    tr_peer * peer;
1597};
1598
1599static int
1600compareChoke( const void * va, const void * vb )
1601{
1602    const struct ChokeData * a = va;
1603    const struct ChokeData * b = vb;
1604    return -tr_compareUint32( a->rate, b->rate );
1605}
1606
1607static int
1608isNew( const tr_peer * peer )
1609{
1610    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1611}
1612
1613static int
1614isSame( const tr_peer * peer )
1615{
1616    return peer && peer->client && strstr( peer->client, "Transmission" );
1617}
1618
1619/**
1620***
1621**/
1622
1623static int
1624getWeightedRate( const tr_peer * peer, int clientIsSeed )
1625{
1626    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1627                                        : peer->rateToClient ) );
1628}
1629
1630static void
1631rechoke( Torrent * t )
1632{
1633    int i, peerCount, size, unchokedInterested;
1634    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1635    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1636    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1637
1638    assert( torrentIsLocked( t ) );
1639   
1640    /* sort the peers by preference and rate */
1641    for( i=0, size=0; i<peerCount; ++i )
1642    {
1643        tr_peer * peer = peers[i];
1644        if( peer->progress >= 1.0 ) /* choke all seeds */
1645            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1646        else {
1647            struct ChokeData * node = &choke[size++];
1648            node->peer = peer;
1649            node->isInterested = peer->peerIsInterested;
1650            node->rate = getWeightedRate( peer, clientIsSeed );
1651        }
1652    }
1653
1654    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1655
1656    /**
1657     * Reciprocation and number of uploads capping is managed by unchoking
1658     * the N peers which have the best upload rate and are interested.
1659     * This maximizes the client's download rate. These N peers are
1660     * referred to as downloaders, because they are interested in downloading
1661     * from the client.
1662     *
1663     * Peers which have a better upload rate (as compared to the downloaders)
1664     * but aren't interested get unchoked. If they become interested, the
1665     * downloader with the worst upload rate gets choked. If a client has
1666     * a complete file, it uses its upload rate rather than its download
1667     * rate to decide which peers to unchoke.
1668     */
1669    unchokedInterested = 0;
1670    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1671        choke[i].doUnchoke = 1;
1672        if( choke[i].isInterested )
1673            ++unchokedInterested;
1674    }
1675
1676    /* optimistic unchoke */
1677    if( i < size )
1678    {
1679        int n;
1680        struct ChokeData * c;
1681        tr_ptrArray * randPool = tr_ptrArrayNew( );
1682
1683        for( ; i<size; ++i )
1684        {
1685            if( choke[i].isInterested )
1686            {
1687                const tr_peer * peer = choke[i].peer;
1688                int x=1, y;
1689                if( isNew( peer ) ) x *= 3;
1690                if( isSame( peer ) ) x *= 3;
1691                for( y=0; y<x; ++y )
1692                    tr_ptrArrayAppend( randPool, choke );
1693            }
1694        }
1695
1696        if(( n = tr_ptrArraySize( randPool )))
1697        {
1698            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1699            c->doUnchoke = 1;
1700            t->optimistic = c->peer;
1701        }
1702
1703        tr_ptrArrayFree( randPool, NULL );
1704    }
1705
1706    for( i=0; i<size; ++i )
1707        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1708
1709    /* cleanup */
1710    tr_free( choke );
1711    tr_free( peers );
1712}
1713
1714static int
1715rechokePulse( void * vtorrent )
1716{
1717    Torrent * t = vtorrent;
1718    torrentLock( t );
1719    rechoke( t );
1720    torrentUnlock( t );
1721    return TRUE;
1722}
1723
1724/***
1725****
1726****  Life and Death
1727****
1728***/
1729
1730static int
1731shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1732{
1733    const tr_torrent * tor = t->tor;
1734    const time_t now = time( NULL );
1735    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1736
1737    /* if it's marked for purging, close it */
1738    if( peer->doPurge ) {
1739        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1740        return TRUE;
1741    }
1742
1743    /* if we're seeding and the peer has everything we have,
1744     * and enough time has passed for a pex exchange, then disconnect */
1745    if( tr_torrentIsSeed( tor ) ) {
1746        int peerHasEverything;
1747        if( atom->flags & ADDED_F_SEED_FLAG )
1748            peerHasEverything = TRUE;
1749        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1750            peerHasEverything = FALSE;
1751        else {
1752            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1753            tr_bitfieldDifference( tmp, peer->have );
1754            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1755            tr_bitfieldFree( tmp );
1756        }
1757        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1758            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1759            return TRUE;
1760        }
1761    }
1762
1763    /* disconnect if it's been too long since piece data has been transferred.
1764     * this is on a sliding scale based on number of available peers... */
1765    {
1766        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1767        /* if we have >= relaxIfFewerThan, strictness is 100%.
1768         * if we have zero connections, strictness is 0% */
1769        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1770            ? 1.0
1771            : peerCount / (float)relaxStrictnessIfFewerThanN;
1772        const int lo = MIN_UPLOAD_IDLE_SECS;
1773        const int hi = MAX_UPLOAD_IDLE_SECS;
1774        const int limit = lo + ((hi-lo) * strictness);
1775        const time_t then = peer->pieceDataActivityDate;
1776        const int idleTime = then ? (now-then) : 0;
1777        if( idleTime > limit ) {
1778            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1779                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1780            return TRUE;
1781        }
1782    }
1783
1784    return FALSE;
1785}
1786
1787static tr_peer **
1788getPeersToClose( Torrent * t, int * setmeSize )
1789{
1790    int i, peerCount, outsize;
1791    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1792    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1793
1794    assert( torrentIsLocked( t ) );
1795
1796    for( i=outsize=0; i<peerCount; ++i )
1797        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1798            ret[outsize++] = peers[i];
1799
1800    *setmeSize = outsize;
1801    return ret;
1802}
1803
1804static int
1805compareCandidates( const void * va, const void * vb )
1806{
1807    const struct peer_atom * a = * (const struct peer_atom**) va;
1808    const struct peer_atom * b = * (const struct peer_atom**) vb;
1809    int i;
1810
1811    if( a->piece_data_time > b->piece_data_time ) return -1;
1812    if( a->piece_data_time < b->piece_data_time ) return  1;
1813
1814    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1815        return i;
1816
1817    if( a->time != b->time )
1818        return a->time < b->time ? -1 : 1;
1819
1820    return 0;
1821}
1822
1823static struct peer_atom **
1824getPeerCandidates( Torrent * t, int * setmeSize )
1825{
1826    int i, atomCount, retCount;
1827    struct peer_atom ** atoms;
1828    struct peer_atom ** ret;
1829    const time_t now = time( NULL );
1830    const int seed = tr_torrentIsSeed( t->tor );
1831
1832    assert( torrentIsLocked( t ) );
1833
1834    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1835    ret = tr_new( struct peer_atom*, atomCount );
1836    for( i=retCount=0; i<atomCount; ++i )
1837    {
1838        struct peer_atom * atom = atoms[i];
1839
1840        /* peer fed us too much bad data ... we only keep it around
1841         * now to weed it out in case someone sends it to us via pex */
1842        if( atom->myflags & MYFLAG_BANNED )
1843            continue;
1844
1845        /* peer was unconnectable before, so we're not going to keep trying.
1846         * this is needs a separate flag from `banned', since if they try
1847         * to connect to us later, we'll let them in */
1848        if( atom->myflags & MYFLAG_UNREACHABLE )
1849            continue;
1850
1851        /* we don't need two connections to the same peer... */
1852        if( peerIsInUse( t, &atom->addr ) )
1853            continue;
1854
1855        /* no need to connect if we're both seeds... */
1856        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1857            continue;
1858
1859        /* we're wasting our time trying to connect to this bozo. */
1860        if( atom->numFails > 3 )
1861            continue;
1862
1863        /* If we were connected to this peer recently and transferring
1864         * piece data, try to reconnect -- network troubles may have
1865         * disconnected us.  but if we weren't sharing piece data,
1866         * hold off on this peer to give another one a try instead */
1867        if( ( now - atom->piece_data_time ) > 30 )
1868        {
1869            int minWait = (60 * 10); /* ten minutes */
1870            int maxWait = (60 * 30); /* thirty minutes */
1871            int wait = atom->numFails * minWait;
1872            if( wait < minWait ) wait = minWait;
1873            if( wait > maxWait ) wait = maxWait;
1874            if( ( now - atom->time ) < wait ) {
1875                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1876                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1877                continue;
1878            }
1879        }
1880
1881        /* Don't connect to peers in our blocklist */
1882        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1883            continue;
1884
1885        ret[retCount++] = atom;
1886    }
1887
1888    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1889    *setmeSize = retCount;
1890    return ret;
1891}
1892
1893static int
1894reconnectPulse( void * vtorrent )
1895{
1896    Torrent * t = vtorrent;
1897    static time_t prevTime = 0;
1898    static int newConnectionsThisSecond = 0;
1899    time_t now;
1900
1901    torrentLock( t );
1902
1903    now = time( NULL );
1904    if( prevTime != now )
1905    {
1906        prevTime = now;
1907        newConnectionsThisSecond = 0;
1908    }
1909
1910    if( !t->isRunning )
1911    {
1912        removeAllPeers( t );
1913    }
1914    else
1915    {
1916        int i, nCandidates, nBad;
1917        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1918        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1919
1920        if( nBad || nCandidates )
1921            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1922                       "%d connection candidates, %d atoms, max per pulse is %d",
1923                       t->tor->info.name, nBad, nCandidates,
1924                       tr_ptrArraySize(t->pool),
1925                       (int)MAX_RECONNECTIONS_PER_PULSE );
1926
1927        /* disconnect some peers.
1928           if we got transferred piece data, then they might be good peers,
1929           so reset their `numFails' weight to zero.  otherwise we connected
1930           to them fruitlessly, so mark it as another fail */
1931        for( i=0; i<nBad; ++i ) {
1932            tr_peer * peer = connections[i];
1933            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1934            if( peer->pieceDataActivityDate )
1935                atom->numFails = 0;
1936            else
1937                ++atom->numFails;
1938            removePeer( t, peer );
1939        }
1940
1941        /* add some new ones */
1942        for( i=0;    i < nCandidates
1943                  && i < MAX_RECONNECTIONS_PER_PULSE
1944                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1945        {
1946            tr_peerMgr * mgr = t->manager;
1947            struct peer_atom * atom = candidates[i];
1948            tr_peerIo * io;
1949
1950            tordbg( t, "Starting an OUTGOING connection with %s",
1951                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1952
1953            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1954            if( io == NULL )
1955            {
1956                atom->myflags |= MYFLAG_UNREACHABLE;
1957            }
1958            else
1959            {
1960                tr_handshake * handshake = tr_handshakeNew( io,
1961                                                            mgr->handle->encryptionMode,
1962                                                            myHandshakeDoneCB,
1963                                                            mgr );
1964
1965                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1966
1967                ++newConnectionsThisSecond;
1968
1969                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1970            }
1971
1972            atom->time = time( NULL );
1973        }
1974
1975        /* cleanup */
1976        tr_free( connections );
1977        tr_free( candidates );
1978    }
1979
1980    torrentUnlock( t );
1981    return TRUE;
1982}
Note: See TracBrowser for help on using the repository browser.