source: trunk/libtransmission/peer-mgr.c @ 6628

Last change on this file since 6628 was 6628, checked in by muks, 13 years ago

Remove unused missingBlockCount

  • Property svn:keywords set to Date Rev Author Id
File size: 55.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6628 2008-08-22 16:03:49Z muks $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = (60 * 3),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = (60 * 10),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = (2 * 1000),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62     * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_ptrArray * webseeds; /* tr_webseed */
107    tr_timer * reconnectTimer;
108    tr_timer * rechokeTimer;
109    tr_timer * refillTimer;
110    tr_torrent * tor;
111    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
112    tr_bitfield * requestedPieces;
113
114    unsigned int isRunning : 1;
115
116    struct tr_peerMgr * manager;
117}
118Torrent;
119
120struct tr_peerMgr
121{
122    tr_handle * handle;
123    tr_ptrArray * torrents; /* Torrent */
124    tr_ptrArray * incomingHandshakes; /* tr_handshake */
125};
126
127#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
128
129/**
130***
131**/
132
133/* The following is not a very high quality random number generator.  It
134 * is mainly used as a simple stupid random number generator inside some
135 * functions below. Please don't use it for anything else unless you
136 * know what you're doing. Use tr_cryptoRandInt() instead.
137 */
138
139static int
140tr_stupidRandInt( int sup )
141{
142    static int init = 0;
143    assert( sup > 0 );
144
145    if( !init )
146    {
147        srand( tr_date() );
148        init = 1;
149    }
150
151    return rand() % sup;
152}
153
154static void
155managerLock( const struct tr_peerMgr * manager )
156{
157    tr_globalLock( manager->handle );
158}
159static void
160managerUnlock( const struct tr_peerMgr * manager )
161{
162    tr_globalUnlock( manager->handle );
163}
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169static void
170torrentUnlock( Torrent * torrent )
171{
172    managerUnlock( torrent->manager );
173}
174static int
175torrentIsLocked( const Torrent * t )
176{
177    return tr_globalIsLocked( t->manager->handle );
178}
179
180/**
181***
182**/
183
184static int
185compareAddresses( const struct in_addr * a, const struct in_addr * b )
186{
187    if( a->s_addr != b->s_addr )
188        return a->s_addr < b->s_addr ? -1 : 1;
189
190    return 0;
191}
192
193static int
194handshakeCompareToAddr( const void * va, const void * vb )
195{
196    const tr_handshake * a = va;
197    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
198}
199
200static int
201handshakeCompare( const void * a, const void * b )
202{
203    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
204}
205
206static tr_handshake*
207getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
208{
209    return tr_ptrArrayFindSorted( handshakes,
210                                  in_addr,
211                                  handshakeCompareToAddr );
212}
213
214static int
215comparePeerAtomToAddress( const void * va, const void * vb )
216{
217    const struct peer_atom * a = va;
218    return compareAddresses( &a->addr, vb );
219}
220
221static int
222comparePeerAtoms( const void * va, const void * vb )
223{
224    const struct peer_atom * b = vb;
225    return comparePeerAtomToAddress( va, &b->addr );
226}
227
228/**
229***
230**/
231
232static int
233torrentCompare( const void * va, const void * vb )
234{
235    const Torrent * a = va;
236    const Torrent * b = vb;
237    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
238}
239
240static int
241torrentCompareToHash( const void * va, const void * vb )
242{
243    const Torrent * a = va;
244    const uint8_t * b_hash = vb;
245    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
246}
247
248static Torrent*
249getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
250{
251    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
252                                             hash,
253                                             torrentCompareToHash );
254}
255
256static int
257peerCompare( const void * va, const void * vb )
258{
259    const tr_peer * a = va;
260    const tr_peer * b = vb;
261    return compareAddresses( &a->in_addr, &b->in_addr );
262}
263
264static int
265peerCompareToAddr( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    return compareAddresses( &a->in_addr, vb );
269}
270
271static tr_peer*
272getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    assert( torrentIsLocked( torrent ) );
275    assert( in_addr );
276
277    return tr_ptrArrayFindSorted( torrent->peers,
278                                  in_addr,
279                                  peerCompareToAddr );
280}
281
282static struct peer_atom*
283getExistingAtom( const Torrent * t, const struct in_addr * addr )
284{
285    assert( torrentIsLocked( t ) );
286    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
287}
288
289static int
290peerIsInUse( const Torrent * ct, const struct in_addr * addr )
291{
292    Torrent * t = (Torrent*) ct;
293
294    assert( torrentIsLocked ( t ) );
295
296    return getExistingPeer( t, addr )
297        || getExistingHandshake( t->outgoingHandshakes, addr )
298        || getExistingHandshake( t->manager->incomingHandshakes, addr );
299}
300
301static tr_peer*
302peerConstructor( const struct in_addr * in_addr )
303{
304    tr_peer * p;
305    p = tr_new0( tr_peer, 1 );
306    p->rcToClient = tr_rcInit( );
307    p->rcToPeer = tr_rcInit( );
308    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
309    return p;
310}
311
312static tr_peer*
313getPeer( Torrent * torrent, const struct in_addr * in_addr )
314{
315    tr_peer * peer;
316
317    assert( torrentIsLocked( torrent ) );
318
319    peer = getExistingPeer( torrent, in_addr );
320
321    if( peer == NULL ) {
322        peer = peerConstructor( in_addr );
323        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
324    }
325
326    return peer;
327}
328
329static void
330peerDestructor( tr_peer * peer )
331{
332    assert( peer );
333    assert( peer->msgs );
334
335    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
336    tr_peerMsgsFree( peer->msgs );
337
338    tr_peerIoFree( peer->io );
339
340    tr_bitfieldFree( peer->have );
341    tr_bitfieldFree( peer->blame );
342    tr_rcClose( peer->rcToClient );
343    tr_rcClose( peer->rcToPeer );
344    tr_free( peer->client );
345    tr_free( peer );
346}
347
348static void
349removePeer( Torrent * t, tr_peer * peer )
350{
351    tr_peer * removed;
352    struct peer_atom * atom;
353
354    assert( torrentIsLocked( t ) );
355
356    atom = getExistingAtom( t, &peer->in_addr );
357    assert( atom );
358    atom->time = time( NULL );
359
360    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
361    assert( removed == peer );
362    peerDestructor( removed );
363}
364
365static void
366removeAllPeers( Torrent * t )
367{
368    while( !tr_ptrArrayEmpty( t->peers ) )
369        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
370}
371
372static void
373torrentDestructor( Torrent * t )
374{
375    uint8_t hash[SHA_DIGEST_LENGTH];
376
377    assert( t );
378    assert( !t->isRunning );
379    assert( t->peers );
380    assert( torrentIsLocked( t ) );
381    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
382    assert( tr_ptrArrayEmpty( t->peers ) );
383
384    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
385
386    tr_timerFree( &t->reconnectTimer );
387    tr_timerFree( &t->rechokeTimer );
388    tr_timerFree( &t->refillTimer );
389
390    tr_bitfieldFree( t->requestedPieces );
391    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
392    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
393    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
394    tr_ptrArrayFree( t->peers, NULL );
395
396    tr_free( t );
397}
398
399static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
400
401static Torrent*
402torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
403{
404    int i;
405    Torrent * t;
406
407    t = tr_new0( Torrent, 1 );
408    t->manager = manager;
409    t->tor = tor;
410    t->pool = tr_ptrArrayNew( );
411    t->peers = tr_ptrArrayNew( );
412    t->webseeds = tr_ptrArrayNew( );
413    t->outgoingHandshakes = tr_ptrArrayNew( );
414    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
415    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
416
417    for( i=0; i<tor->info.webseedCount; ++i ) {
418        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
419        tr_ptrArrayAppend( t->webseeds, w );
420    }
421
422    return t;
423}
424
425/**
426 * For explanation, see http://www.bittorrent.org/fast_extensions.html
427 * Also see the "test-allowed-set" unit test
428 */
429struct tr_bitfield *
430tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
431                              const uint32_t         sz,        /* number of pieces in torrent */
432                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
433                              const struct in_addr * ip )       /* peer's address */
434{
435    uint8_t w[SHA_DIGEST_LENGTH + 4];
436    uint8_t x[SHA_DIGEST_LENGTH];
437    tr_bitfield * a;
438    uint32_t a_size;
439
440    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
441    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
442    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
443
444    a = tr_bitfieldNew( sz );
445    a_size = 0;
446   
447    while( a_size < k )
448    {
449        int i;
450        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
451        {
452            uint32_t j = i * 4;                                /* (5) */
453            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
454            uint32_t index = y % sz;                           /* (7) */
455            if( !tr_bitfieldHas( a, index ) ) {                /* (8) */
456                tr_bitfieldAdd( a, index );                    /* (9) */
457                ++a_size;
458            }
459        }
460        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
461    }
462   
463    return a;
464}
465
466tr_peerMgr*
467tr_peerMgrNew( tr_handle * handle )
468{
469    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
470    m->handle = handle;
471    m->torrents = tr_ptrArrayNew( );
472    m->incomingHandshakes = tr_ptrArrayNew( );
473    return m;
474}
475
476void
477tr_peerMgrFree( tr_peerMgr * manager )
478{
479    managerLock( manager );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
485    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
486
487    /* free the torrents. */
488    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
489
490    managerUnlock( manager );
491    tr_free( manager );
492}
493
494static tr_peer**
495getConnectedPeers( Torrent * t, int * setmeCount )
496{
497    int i, peerCount, connectionCount;
498    tr_peer **peers;
499    tr_peer **ret;
500
501    assert( torrentIsLocked( t ) );
502
503    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
504    ret = tr_new( tr_peer*, peerCount );
505
506    for( i=connectionCount=0; i<peerCount; ++i )
507        if( peers[i]->msgs )
508            ret[connectionCount++] = peers[i];
509
510    *setmeCount = connectionCount;
511    return ret;
512}
513
514static int
515clientIsDownloadingFrom( const tr_peer * peer )
516{
517    return peer->clientIsInterested && !peer->clientIsChoked;
518}
519
520static int
521clientIsUploadingTo( const tr_peer * peer )
522{
523    return peer->peerIsInterested && !peer->peerIsChoked;
524}
525
526/***
527****
528***/
529
530int
531tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
532                      const uint8_t          * torrentHash,
533                      const struct in_addr   * addr )
534{
535    int isSeed = FALSE;
536    const Torrent * t = NULL;
537    const struct peer_atom * atom = NULL;
538
539    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
540    if( t )
541        atom = getExistingAtom( t, addr );
542    if( atom )
543        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
544
545    return isSeed;
546}
547
548/***
549****  Refill
550***/
551
552struct tr_refill_piece
553{
554    tr_priority_t priority;
555    int random;
556    uint32_t piece;
557    uint32_t peerCount;
558};
559
560static int
561compareRefillPiece (const void * aIn, const void * bIn)
562{
563    const struct tr_refill_piece * a = aIn;
564    const struct tr_refill_piece * b = bIn;
565
566    /* if one piece has a higher priority, it goes first */
567    if( a->priority != b->priority )
568        return a->priority > b->priority ? -1 : 1;
569
570    /* otherwise if one has fewer peers, it goes first */
571    if( a->peerCount != b->peerCount )
572        return a->peerCount < b->peerCount ? -1 : 1;
573
574    /* otherwise go with our random seed */
575    if( a->random != b->random )
576        return a->random < b->random ? -1 : 1;
577
578    return 0;
579}
580
581static int
582isPieceInteresting( const tr_torrent  * tor,
583                    tr_piece_index_t    piece )
584{
585    if( tor->info.pieces[piece].dnd ) /* we don't want it */
586        return 0;
587
588    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
589        return 0;
590
591    return 1;
592}
593
594static uint32_t*
595getPreferredPieces( Torrent     * t,
596                    uint32_t    * pieceCount )
597{
598    const tr_torrent * tor = t->tor;
599    const tr_info * inf = &tor->info;
600    tr_piece_index_t i;
601    uint32_t poolSize = 0;
602    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
603    int peerCount;
604    tr_peer** peers;
605
606    assert( torrentIsLocked( t ) );
607
608    peers = getConnectedPeers( t, &peerCount );
609
610    for( i=0; i<inf->pieceCount; ++i )
611        if( isPieceInteresting( tor, i ) )
612            pool[poolSize++] = i;
613
614    /* sort the pool from most interesting to least... */
615    if( poolSize > 1 )
616    {
617        uint32_t j;
618        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
619
620        for( j=0; j<poolSize; ++j )
621        {
622            int k;
623            const tr_piece_index_t piece = pool[j];
624            struct tr_refill_piece * setme = p + j;
625
626            setme->piece = piece;
627            setme->priority = inf->pieces[piece].priority;
628            setme->peerCount = 0;
629            setme->random = tr_stupidRandInt( INT_MAX );
630
631            for( k=0; k<peerCount; ++k ) {
632                const tr_peer * peer = peers[k];
633                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
634                    ++setme->peerCount;
635            }
636        }
637
638        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
639
640        for( j=0; j<poolSize; ++j )
641            pool[j] = p[j].piece;
642
643        tr_free( p );
644    }
645
646    tr_free( peers );
647
648    *pieceCount = poolSize;
649    return pool;
650}
651
652static tr_peer**
653getPeersUploadingToClient( Torrent * t, int * setmeCount )
654{
655    int i;
656    int peerCount = 0;
657    int retCount = 0;
658    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
659    tr_peer ** ret = tr_new( tr_peer*, peerCount );
660
661    /* get a list of peers we're downloading from */
662    for( i=0; i<peerCount; ++i )
663        if( clientIsDownloadingFrom( peers[i] ) )
664            ret[retCount++] = peers[i];
665
666    /* pick a different starting point each time so all peers
667     * get a chance at the first blocks in the queue */
668    if( retCount ) {
669        tr_peer ** tmp = tr_new( tr_peer*, retCount );
670        i = tr_stupidRandInt( retCount );
671        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
672        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
673        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
674        tr_free( tmp );
675    }
676
677    *setmeCount = retCount;
678    return ret;
679}
680
681static int
682refillPulse( void * vtorrent )
683{
684    Torrent * t = vtorrent;
685    tr_torrent * tor = t->tor;
686    int peerCount;
687    int webseedCount;
688    tr_peer ** peers;
689    tr_webseed ** webseeds;
690    uint32_t pieceCount;
691    uint32_t * pieces;
692    tr_piece_index_t i;
693
694    if( !t->isRunning )
695        return TRUE;
696    if( tr_torrentIsSeed( t->tor ) )
697        return TRUE;
698
699    torrentLock( t );
700    tordbg( t, "Refilling Request Buffers..." );
701
702    pieces = getPreferredPieces( t, &pieceCount );
703    peers = getPeersUploadingToClient( t, &peerCount );
704    webseedCount = tr_ptrArraySize( t->webseeds );
705    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
706
707    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
708    {
709        int j;
710        int handled = FALSE;
711        const tr_piece_index_t piece = pieces[i];
712
713        assert( piece < tor->info.pieceCount );
714
715        /* find a peer who can ask for this piece */
716        for( j=0; !handled && j<peerCount; )
717        {
718            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
719            switch( val )
720            {
721                case TR_ADDREQ_FULL: 
722                case TR_ADDREQ_CLIENT_CHOKED:
723                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
724                    break;
725                case TR_ADDREQ_MISSING: 
726                case TR_ADDREQ_DUPLICATE: 
727                    ++j;
728                    break;
729                case TR_ADDREQ_OK:
730                    tr_bitfieldAdd( t->requestedPieces, piece );
731                    handled = TRUE;
732                    break;
733                default:
734                    assert( 0 && "unhandled value" );
735                    break;
736            }
737        }
738
739        /* maybe one of the webseeds can do it */
740        for( j=0; !handled && j<webseedCount; )
741        {
742            const int val = tr_webseedAddRequest( webseeds[j], piece );
743            switch( val )
744            {
745                case TR_ADDREQ_FULL: 
746                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
747                    break;
748                case TR_ADDREQ_OK:
749                    tr_bitfieldAdd( t->requestedPieces, piece );
750                    handled = TRUE;
751                    break;
752                default:
753                    assert( 0 && "unhandled value" );
754                    break;
755            }
756        }
757    }
758
759    /* cleanup */
760    tr_free( webseeds );
761    tr_free( peers );
762    tr_free( pieces );
763
764    t->refillTimer = NULL;
765    torrentUnlock( t );
766    return FALSE;
767}
768
769static void
770addStrike( Torrent * t, tr_peer * peer )
771{
772    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
773
774    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
775    {
776        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
777        atom->myflags |= MYFLAG_BANNED;
778        peer->doPurge = 1;
779        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
780    }
781}
782
783static void
784gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
785{
786    tr_torrent * tor = t->tor;
787    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
788    tor->corruptCur += byteCount;
789    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
790}
791
792static void
793refillSoon( Torrent * t )
794{
795    if( t->refillTimer == NULL )
796        t->refillTimer = tr_timerNew( t->manager->handle,
797                                      refillPulse, t,
798                                      REFILL_PERIOD_MSEC );
799}
800
801static void
802peerCallbackFunc( void * vpeer, void * vevent, void * vt )
803{
804    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
805    Torrent * t = (Torrent *) vt;
806    const tr_peer_event * e = vevent;
807
808    torrentLock( t );
809
810    switch( e->eventType )
811    {
812        case TR_PEER_NEED_REQ:
813            refillSoon( t );
814            break;
815
816        case TR_PEER_CANCEL:
817            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
818            break;
819
820        case TR_PEER_PEER_GOT_DATA: {
821            const time_t now = time( NULL );
822            tr_torrent * tor = t->tor;
823            tor->activityDate = now;
824            tor->uploadedCur += e->length;
825            tr_rcTransferred( tor->upload, e->length );
826            tr_rcTransferred( tor->handle->upload, e->length );
827            tr_statsAddUploaded( tor->handle, e->length );
828            if( peer ) {
829                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
830                atom->piece_data_time = time( NULL );
831            }
832            break;
833        }
834
835        case TR_PEER_CLIENT_GOT_DATA: {
836            const time_t now = time( NULL );
837            tr_torrent * tor = t->tor;
838            tor->activityDate = now;
839            /* only add this to downloadedCur if we got it from a peer --
840             * webseeds shouldn't count against our ratio.  As one tracker
841             * admin put it, "Those pieces are downloaded directly from the
842             * content distributor, not the peers, it is the tracker's job
843             * to manage the swarms, not the web server and does not fit
844             * into the jurisdiction of the tracker." */
845            if( peer )
846                tor->downloadedCur += e->length;
847            tr_rcTransferred( tor->download, e->length );
848            tr_rcTransferred( tor->handle->download, e->length );
849            tr_statsAddDownloaded( tor->handle, e->length );
850            if( peer ) {
851                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
852                atom->piece_data_time = time( NULL );
853            }
854            break;
855        }
856
857        case TR_PEER_PEER_PROGRESS: {
858            if( peer ) {
859                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
860                const int peerIsSeed = e->progress >= 1.0;
861                if( peerIsSeed ) {
862                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
863                    atom->flags |= ADDED_F_SEED_FLAG;
864                } else {
865                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
866                    atom->flags &= ~ADDED_F_SEED_FLAG;
867                }
868            }
869            break;
870        }
871
872        case TR_PEER_CLIENT_GOT_BLOCK:
873        {
874            tr_torrent * tor = t->tor;
875
876            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
877
878            tr_cpBlockAdd( tor->completion, block );
879
880            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
881            {
882                const tr_piece_index_t p = e->pieceIndex;
883                const tr_errno err = tr_ioTestPiece( tor, p );
884
885                if( err ) {
886                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
887                               (unsigned long)p, tr_errorString( err ) );
888                }
889
890                tr_torrentSetHasPiece( tor, p, !err );
891                tr_torrentSetPieceChecked( tor, p, TRUE );
892                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
893
894                if( err )
895                    gotBadPiece( t, p );
896                else {
897                    int i, peerCount;
898                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
899                    for( i=0; i<peerCount; ++i )
900                        tr_peerMsgsHave( peers[i]->msgs, p );
901                    tr_free( peers );
902                }
903
904                tr_torrentRecheckCompleteness( tor );
905            }
906            break;
907        }
908
909        case TR_PEER_ERROR:
910            if( TR_ERROR_IS_IO( e->err ) ) {
911                t->tor->error = e->err;
912                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
913                tr_torrentStop( t->tor );
914            } else if( e->err == TR_ERROR_ASSERT ) {
915                addStrike( t, peer );
916            }
917            peer->doPurge = 1;
918            break;
919
920        default:
921            assert(0);
922    }
923
924    torrentUnlock( t );
925}
926
927static void
928ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
929{
930    if( getExistingAtom( t, addr ) == NULL )
931    {
932        struct peer_atom * a;
933        a = tr_new0( struct peer_atom, 1 );
934        a->addr = *addr;
935        a->port = port;
936        a->flags = flags;
937        a->from = from;
938        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
939        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
940    }
941}
942
943static int
944getMaxPeerCount( const tr_torrent * tor UNUSED )
945{
946    return tor->maxConnectedPeers;
947}
948
949/* FIXME: this is kind of a mess. */
950static void
951myHandshakeDoneCB( tr_handshake    * handshake,
952                   tr_peerIo       * io,
953                   int               isConnected,
954                   const uint8_t   * peer_id,
955                   void            * vmanager )
956{
957    int ok = isConnected;
958    uint16_t port;
959    const struct in_addr * addr;
960    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
961    Torrent * t;
962    tr_handshake * ours;
963
964    assert( io );
965    assert( isConnected==0 || isConnected==1 );
966
967    t = tr_peerIoHasTorrentHash( io )
968        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
969        : NULL;
970
971    if( tr_peerIoIsIncoming ( io ) )
972        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
973                                        handshake, handshakeCompare );
974    else if( t )
975        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
976                                        handshake, handshakeCompare );
977    else
978        ours = handshake;
979
980    assert( ours );
981    assert( ours == handshake );
982
983    if( t )
984        torrentLock( t );
985
986    addr = tr_peerIoGetAddress( io, &port );
987
988    if( !ok || !t || !t->isRunning )
989    {
990        if( t ) {
991            struct peer_atom * atom = getExistingAtom( t, addr );
992            if( atom )
993                ++atom->numFails;
994        }
995
996        tr_peerIoFree( io );
997    }
998    else /* looking good */
999    {
1000        struct peer_atom * atom;
1001        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1002        atom = getExistingAtom( t, addr );
1003
1004        if( atom->myflags & MYFLAG_BANNED )
1005        {
1006            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1007            tr_peerIoFree( io );
1008        }
1009        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1010        {
1011            tr_peerIoFree( io );
1012        }
1013        else
1014        {
1015            tr_peer * peer = getExistingPeer( t, addr );
1016
1017            if( peer ) /* we already have this peer */
1018            {
1019                tr_peerIoFree( io );
1020            }
1021            else
1022            {
1023                peer = getPeer( t, addr );
1024                tr_free( peer->client );
1025
1026                if( !peer_id )
1027                    peer->client = NULL;
1028                else {
1029                    char client[128];
1030                    tr_clientForId( client, sizeof( client ), peer_id );
1031                    peer->client = tr_strdup( client );
1032                }
1033                peer->port = port;
1034                peer->io = io;
1035                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1036                atom->time = time( NULL );
1037            }
1038        }
1039    }
1040
1041    if( t )
1042        torrentUnlock( t );
1043}
1044
1045void
1046tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1047                       struct in_addr  * addr,
1048                       uint16_t          port,
1049                       int               socket )
1050{
1051    managerLock( manager );
1052
1053    if( tr_sessionIsAddressBlocked( manager->handle, addr ) )
1054    {
1055        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1056                inet_ntoa( *addr ) );
1057        tr_netClose( socket );
1058    }
1059    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1060    {
1061        tr_netClose( socket );
1062    }
1063    else /* we don't have a connetion to them yet... */
1064    {
1065        tr_peerIo * io;
1066        tr_handshake * handshake;
1067
1068        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1069
1070        handshake = tr_handshakeNew( io,
1071                                     manager->handle->encryptionMode,
1072                                     myHandshakeDoneCB,
1073                                     manager );
1074
1075        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1076    }
1077
1078    managerUnlock( manager );
1079}
1080
1081void
1082tr_peerMgrAddPex( tr_peerMgr     * manager,
1083                  const uint8_t  * torrentHash,
1084                  uint8_t          from,
1085                  const tr_pex   * pex )
1086{
1087    Torrent * t;
1088    managerLock( manager );
1089
1090    t = getExistingTorrent( manager, torrentHash );
1091    if( !tr_sessionIsAddressBlocked( t->manager->handle, &pex->in_addr ) )
1092        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1093
1094    managerUnlock( manager );
1095}
1096
1097tr_pex *
1098tr_peerMgrCompactToPex( const void     * compact,
1099                        size_t           compactLen,
1100                        const uint8_t  * added_f,
1101                        size_t           added_f_len,
1102                        size_t         * pexCount )
1103{
1104    size_t i;
1105    size_t n = compactLen / 6;
1106    const uint8_t * walk = compact;
1107    tr_pex * pex = tr_new0( tr_pex, n );
1108
1109    for( i=0; i<n; ++i ) {
1110        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1111        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1112        if( added_f && ( n == added_f_len ) )
1113            pex[i].flags = added_f[i];
1114    }
1115
1116    *pexCount = n;
1117    return pex;
1118}
1119
1120/**
1121***
1122**/
1123
1124void
1125tr_peerMgrSetBlame( tr_peerMgr     * manager,
1126                    const uint8_t  * torrentHash,
1127                    tr_piece_index_t pieceIndex,
1128                    int              success )
1129{
1130    if( !success )
1131    {
1132        int peerCount, i;
1133        Torrent * t = getExistingTorrent( manager, torrentHash );
1134        tr_peer ** peers;
1135
1136        assert( torrentIsLocked( t ) );
1137
1138        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1139        for( i=0; i<peerCount; ++i )
1140        {
1141            tr_peer * peer = peers[i];
1142            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1143            {
1144                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1145                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1146                           pieceIndex, (int)peer->strikes+1 );
1147                addStrike( t, peer );
1148            }
1149        }
1150    }
1151}
1152
1153int
1154tr_pexCompare( const void * va, const void * vb )
1155{
1156    const tr_pex * a = va;
1157    const tr_pex * b = vb;
1158    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1159    if( i ) return i;
1160    if( a->port < b->port ) return -1;
1161    if( a->port > b->port ) return 1;
1162    return 0;
1163}
1164
1165int tr_pexCompare( const void * a, const void * b );
1166
1167static int
1168peerPrefersCrypto( const tr_peer * peer )
1169{
1170    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1171        return TRUE;
1172
1173    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1174        return FALSE;
1175
1176    return tr_peerIoIsEncrypted( peer->io );
1177};
1178
1179int
1180tr_peerMgrGetPeers( tr_peerMgr      * manager,
1181                    const uint8_t   * torrentHash,
1182                    tr_pex         ** setme_pex )
1183{
1184    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1185    int i, peerCount;
1186    const tr_peer ** peers;
1187    tr_pex * pex;
1188    tr_pex * walk;
1189
1190    managerLock( manager );
1191
1192    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1193    pex = walk = tr_new( tr_pex, peerCount );
1194
1195    for( i=0; i<peerCount; ++i, ++walk )
1196    {
1197        const tr_peer * peer = peers[i];
1198
1199        walk->in_addr = peer->in_addr;
1200
1201        walk->port = peer->port;
1202
1203        walk->flags = 0;
1204        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1205        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1206    }
1207
1208    assert( ( walk - pex ) == peerCount );
1209    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1210    *setme_pex = pex;
1211
1212    managerUnlock( manager );
1213
1214    return peerCount;
1215}
1216
1217static int reconnectPulse( void * vtorrent );
1218static int rechokePulse( void * vtorrent );
1219
1220void
1221tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1222                        const uint8_t  * torrentHash )
1223{
1224    Torrent * t;
1225
1226    managerLock( manager );
1227
1228    t = getExistingTorrent( manager, torrentHash );
1229
1230    assert( t );
1231    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1232    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1233
1234    if( !t->isRunning )
1235    {
1236        t->isRunning = 1;
1237
1238        t->reconnectTimer = tr_timerNew( t->manager->handle,
1239                                         reconnectPulse, t,
1240                                         RECONNECT_PERIOD_MSEC );
1241
1242        t->rechokeTimer = tr_timerNew( t->manager->handle,
1243                                       rechokePulse, t,
1244                                       RECHOKE_PERIOD_MSEC );
1245
1246        reconnectPulse( t );
1247
1248        rechokePulse( t );
1249
1250        if( !tr_ptrArrayEmpty( t->webseeds ) )
1251            refillSoon( t );
1252    }
1253
1254    managerUnlock( manager );
1255}
1256
1257static void
1258stopTorrent( Torrent * t )
1259{
1260    assert( torrentIsLocked( t ) );
1261
1262    t->isRunning = 0;
1263    tr_timerFree( &t->rechokeTimer );
1264    tr_timerFree( &t->reconnectTimer );
1265
1266    /* disconnect the peers. */
1267    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1268    tr_ptrArrayClear( t->peers );
1269
1270    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1271     * which removes the handshake from t->outgoingHandshakes... */
1272    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1273        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1274}
1275void
1276tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1277                       const uint8_t  * torrentHash)
1278{
1279    managerLock( manager );
1280
1281    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1282
1283    managerUnlock( manager );
1284}
1285
1286void
1287tr_peerMgrAddTorrent( tr_peerMgr * manager,
1288                      tr_torrent * tor )
1289{
1290    Torrent * t;
1291
1292    managerLock( manager );
1293
1294    assert( tor );
1295    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1296
1297    t = torrentConstructor( manager, tor );
1298    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1299
1300    managerUnlock( manager );
1301}
1302
1303void
1304tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1305                         const uint8_t  * torrentHash )
1306{
1307    Torrent * t;
1308
1309    managerLock( manager );
1310
1311    t = getExistingTorrent( manager, torrentHash );
1312    assert( t );
1313    stopTorrent( t );
1314    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1315    torrentDestructor( t );
1316
1317    managerUnlock( manager );
1318}
1319
1320void
1321tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1322                               const uint8_t    * torrentHash,
1323                               int8_t           * tab,
1324                               unsigned int       tabCount )
1325{
1326    tr_piece_index_t i;
1327    const Torrent * t;
1328    const tr_torrent * tor;
1329    float interval;
1330    int isComplete;
1331    int peerCount;
1332    const tr_peer ** peers;
1333
1334    managerLock( manager );
1335
1336    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1337    tor = t->tor;
1338    interval = tor->info.pieceCount / (float)tabCount;
1339    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1340    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1341
1342    memset( tab, 0, tabCount );
1343
1344    for( i=0; tor && i<tabCount; ++i )
1345    {
1346        const int piece = i * interval;
1347
1348        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1349            tab[i] = -1;
1350        else if( peerCount ) {
1351            int j;
1352            for( j=0; j<peerCount; ++j )
1353                if( tr_bitfieldHas( peers[j]->have, i ) )
1354                    ++tab[i];
1355        }
1356    }
1357
1358    managerUnlock( manager );
1359}
1360
1361/* Returns the pieces that are available from peers */
1362tr_bitfield*
1363tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1364                        const uint8_t    * torrentHash )
1365{
1366    int i, size;
1367    Torrent * t;
1368    tr_peer ** peers;
1369    tr_bitfield * pieces;
1370    managerLock( manager );
1371
1372    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1373    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1374    peers = getConnectedPeers( t, &size );
1375    for( i=0; i<size; ++i )
1376        tr_bitfieldOr( pieces, peers[i]->have );
1377
1378    managerUnlock( manager );
1379    tr_free( peers );
1380    return pieces;
1381}
1382
1383int
1384tr_peerMgrHasConnections( const tr_peerMgr * manager,
1385                          const uint8_t    * torrentHash )
1386{
1387    int ret;
1388    const Torrent * t;
1389    managerLock( manager );
1390
1391    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1392    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1393
1394    managerUnlock( manager );
1395    return ret;
1396}
1397
1398void
1399tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1400                        const uint8_t    * torrentHash,
1401                        int              * setmePeersKnown,
1402                        int              * setmePeersConnected,
1403                        int              * setmeSeedsConnected,
1404                        int              * setmeWebseedsSendingToUs,
1405                        int              * setmePeersSendingToUs,
1406                        int              * setmePeersGettingFromUs,
1407                        int              * setmePeersFrom )
1408{
1409    int i, size;
1410    const Torrent * t;
1411    const tr_peer ** peers;
1412    const tr_webseed ** webseeds;
1413
1414    managerLock( manager );
1415
1416    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1417    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1418
1419    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1420    *setmePeersConnected       = 0;
1421    *setmeSeedsConnected       = 0;
1422    *setmePeersGettingFromUs   = 0;
1423    *setmePeersSendingToUs     = 0;
1424    *setmeWebseedsSendingToUs  = 0;
1425
1426    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1427        setmePeersFrom[i] = 0;
1428
1429    for( i=0; i<size; ++i )
1430    {
1431        const tr_peer * peer = peers[i];
1432        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1433
1434        if( peer->io == NULL ) /* not connected */
1435            continue;
1436
1437        ++*setmePeersConnected;
1438
1439        ++setmePeersFrom[atom->from];
1440
1441        if( clientIsDownloadingFrom( peer ) )
1442            ++*setmePeersSendingToUs;
1443
1444        if( clientIsUploadingTo( peer ) )
1445            ++*setmePeersGettingFromUs;
1446
1447        if( atom->flags & ADDED_F_SEED_FLAG )
1448            ++*setmeSeedsConnected;
1449    }
1450
1451    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1452    for( i=0; i<size; ++i )
1453    {
1454        if( tr_webseedIsActive( webseeds[i] ) )
1455            ++*setmeWebseedsSendingToUs;
1456    }
1457
1458    managerUnlock( manager );
1459}
1460
1461float*
1462tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1463                     const uint8_t    * torrentHash )
1464{
1465    const Torrent * t;
1466    const tr_webseed ** webseeds;
1467    int i;
1468    int webseedCount;
1469    float * ret;
1470
1471    assert( manager );
1472    managerLock( manager );
1473
1474    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1475    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds, &webseedCount );
1476    assert( webseedCount == t->tor->info.webseedCount );
1477    ret = tr_new0( float, webseedCount );
1478
1479    for( i=0; i<webseedCount; ++i )
1480        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1481            ret[i] = -1.0;
1482
1483    managerUnlock( manager );
1484    return ret;
1485}
1486
1487struct tr_peer_stat *
1488tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1489                     const uint8_t     * torrentHash,
1490                     int               * setmeCount UNUSED )
1491{
1492    int i, size;
1493    const Torrent * t;
1494    tr_peer ** peers;
1495    tr_peer_stat * ret;
1496
1497    assert( manager );
1498    managerLock( manager );
1499
1500    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1501    peers = getConnectedPeers( (Torrent*)t, &size );
1502    ret = tr_new0( tr_peer_stat, size );
1503
1504    for( i=0; i<size; ++i )
1505    {
1506        char * pch;
1507        const tr_peer * peer = peers[i];
1508        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1509        tr_peer_stat * stat = ret + i;
1510
1511        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1512        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1513        stat->port               = peer->port;
1514        stat->from               = atom->from;
1515        stat->progress           = peer->progress;
1516        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1517        stat->uploadToRate       = peer->rateToPeer;
1518        stat->downloadFromRate   = peer->rateToClient;
1519        stat->peerIsChoked       = peer->peerIsChoked;
1520        stat->peerIsInterested   = peer->peerIsInterested;
1521        stat->clientIsChoked     = peer->clientIsChoked;
1522        stat->clientIsInterested = peer->clientIsInterested;
1523        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1524        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1525        stat->isUploadingTo      = clientIsUploadingTo( peer );
1526
1527        pch = stat->flagStr;
1528        if( t->optimistic == peer ) *pch++ = 'O';
1529        if( stat->isDownloadingFrom ) *pch++ = 'D';
1530        else if( stat->clientIsInterested ) *pch++ = 'd';
1531        if( stat->isUploadingTo ) *pch++ = 'U';
1532        else if( stat->peerIsInterested ) *pch++ = 'u';
1533        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1534        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1535        if( stat->isEncrypted ) *pch++ = 'E';
1536        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1537        if( stat->isIncoming ) *pch++ = 'I';
1538        *pch = '\0';
1539    }
1540
1541    *setmeCount = size;
1542    tr_free( peers );
1543
1544    managerUnlock( manager );
1545    return ret;
1546}
1547
1548/**
1549***
1550**/
1551
1552struct ChokeData
1553{
1554    unsigned int  doUnchoke     : 1;
1555    unsigned int  isInterested  : 1;
1556    tr_peer * peer;
1557};
1558
1559static int
1560tr_compareDouble( double a, double b )
1561{
1562    if( a < b ) return -1;
1563    if( a > b ) return 1;
1564    return 0;
1565}
1566
1567static int
1568compareChoke( const void * va, const void * vb )
1569{
1570    const struct ChokeData * a = va;
1571    const struct ChokeData * b = vb;
1572    int diff = 0;
1573
1574    if( diff == 0 ) /* prefer higher dl speeds */
1575        diff = -tr_compareDouble( a->peer->rateToClient, b->peer->rateToClient );
1576    if( diff == 0 ) /* prefer higher ul speeds */
1577        diff = -tr_compareDouble( a->peer->rateToPeer, b->peer->rateToPeer );
1578    if( diff == 0 ) /* prefer unchoked */
1579        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1580
1581    return diff;
1582}
1583
1584static int
1585isNew( const tr_peer * peer )
1586{
1587    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1588}
1589
1590static int
1591isSame( const tr_peer * peer )
1592{
1593    return peer && peer->client && strstr( peer->client, "Transmission" );
1594}
1595
1596/**
1597***
1598**/
1599
1600static void
1601rechoke( Torrent * t )
1602{
1603    int i, peerCount, size, unchokedInterested;
1604    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1605    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1606
1607    assert( torrentIsLocked( t ) );
1608   
1609    /* sort the peers by preference and rate */
1610    for( i=0, size=0; i<peerCount; ++i )
1611    {
1612        tr_peer * peer = peers[i];
1613        if( peer->progress >= 1.0 ) /* choke all seeds */
1614            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1615        else {
1616            struct ChokeData * node = &choke[size++];
1617            node->peer = peer;
1618            node->isInterested = peer->peerIsInterested;
1619        }
1620    }
1621
1622    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1623
1624    /**
1625     * Reciprocation and number of uploads capping is managed by unchoking
1626     * the N peers which have the best upload rate and are interested.
1627     * This maximizes the client's download rate. These N peers are
1628     * referred to as downloaders, because they are interested in downloading
1629     * from the client.
1630     *
1631     * Peers which have a better upload rate (as compared to the downloaders)
1632     * but aren't interested get unchoked. If they become interested, the
1633     * downloader with the worst upload rate gets choked. If a client has
1634     * a complete file, it uses its upload rate rather than its download
1635     * rate to decide which peers to unchoke.
1636     */
1637    unchokedInterested = 0;
1638    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1639        choke[i].doUnchoke = 1;
1640        if( choke[i].isInterested )
1641            ++unchokedInterested;
1642    }
1643
1644    /* optimistic unchoke */
1645    if( i < size )
1646    {
1647        int n;
1648        struct ChokeData * c;
1649        tr_ptrArray * randPool = tr_ptrArrayNew( );
1650
1651        for( ; i<size; ++i )
1652        {
1653            if( choke[i].isInterested )
1654            {
1655                const tr_peer * peer = choke[i].peer;
1656                int x=1, y;
1657                if( isNew( peer ) ) x *= 3;
1658                if( isSame( peer ) ) x *= 3;
1659                for( y=0; y<x; ++y )
1660                    tr_ptrArrayAppend( randPool, &choke[i] );
1661            }
1662        }
1663
1664        if(( n = tr_ptrArraySize( randPool )))
1665        {
1666            c = tr_ptrArrayNth( randPool, tr_stupidRandInt( n ));
1667            c->doUnchoke = 1;
1668            t->optimistic = c->peer;
1669        }
1670
1671        tr_ptrArrayFree( randPool, NULL );
1672    }
1673
1674    for( i=0; i<size; ++i )
1675        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1676
1677    /* cleanup */
1678    tr_free( choke );
1679    tr_free( peers );
1680}
1681
1682static int
1683rechokePulse( void * vtorrent )
1684{
1685    Torrent * t = vtorrent;
1686    torrentLock( t );
1687    rechoke( t );
1688    torrentUnlock( t );
1689    return TRUE;
1690}
1691
1692/***
1693****
1694****  Life and Death
1695****
1696***/
1697
1698static int
1699shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1700{
1701    const tr_torrent * tor = t->tor;
1702    const time_t now = time( NULL );
1703    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1704
1705    /* if it's marked for purging, close it */
1706    if( peer->doPurge ) {
1707        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1708        return TRUE;
1709    }
1710
1711    /* if we're seeding and the peer has everything we have,
1712     * and enough time has passed for a pex exchange, then disconnect */
1713    if( tr_torrentIsSeed( tor ) ) {
1714        int peerHasEverything;
1715        if( atom->flags & ADDED_F_SEED_FLAG )
1716            peerHasEverything = TRUE;
1717        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1718            peerHasEverything = FALSE;
1719        else {
1720            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1721            tr_bitfieldDifference( tmp, peer->have );
1722            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1723            tr_bitfieldFree( tmp );
1724        }
1725        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1726            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1727            return TRUE;
1728        }
1729    }
1730
1731    /* disconnect if it's been too long since piece data has been transferred.
1732     * this is on a sliding scale based on number of available peers... */
1733    {
1734        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1735        /* if we have >= relaxIfFewerThan, strictness is 100%.
1736         * if we have zero connections, strictness is 0% */
1737        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1738            ? 1.0
1739            : peerCount / (float)relaxStrictnessIfFewerThanN;
1740        const int lo = MIN_UPLOAD_IDLE_SECS;
1741        const int hi = MAX_UPLOAD_IDLE_SECS;
1742        const int limit = lo + ((hi-lo) * strictness);
1743        const time_t then = peer->pieceDataActivityDate;
1744        const int idleTime = then ? (now-then) : 0;
1745        if( idleTime > limit ) {
1746            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1747                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1748            return TRUE;
1749        }
1750    }
1751
1752    return FALSE;
1753}
1754
1755static tr_peer **
1756getPeersToClose( Torrent * t, int * setmeSize )
1757{
1758    int i, peerCount, outsize;
1759    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1760    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1761
1762    assert( torrentIsLocked( t ) );
1763
1764    for( i=outsize=0; i<peerCount; ++i )
1765        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1766            ret[outsize++] = peers[i];
1767
1768    *setmeSize = outsize;
1769    return ret;
1770}
1771
1772static int
1773compareCandidates( const void * va, const void * vb )
1774{
1775    const struct peer_atom * a = * (const struct peer_atom**) va;
1776    const struct peer_atom * b = * (const struct peer_atom**) vb;
1777
1778    /* <Charles> Here we would probably want to try reconnecting to
1779     * peers that had most recently given us data. Lots of users have
1780     * trouble with resets due to their routers and/or ISPs. This way we
1781     * can quickly recover from an unwanted reset. So we sort
1782     * piece_data_time in descending order.
1783     */
1784
1785    if( a->piece_data_time != b->piece_data_time )
1786        return a->piece_data_time < b->piece_data_time ? 1 : -1;
1787
1788    if( a->numFails != b->numFails )
1789        return a->numFails < b->numFails ? -1 : 1;
1790
1791    if( a->time != b->time )
1792        return a->time < b->time ? -1 : 1;
1793
1794    return 0;
1795}
1796
1797static int
1798getReconnectIntervalSecs( const struct peer_atom * atom )
1799{
1800    int sec;
1801
1802    switch( atom->numFails )
1803    {
1804        case 0: sec = 0; break;
1805        case 1: sec = 5; break;
1806        case 2: sec = 2*60; break;
1807        case 3: sec = 15*60; break;
1808        case 4: sec = 30*60; break;
1809        case 5: sec = 60*60; break;
1810        default: sec = 120*60; break;
1811    }
1812
1813    return sec;
1814}
1815
1816static struct peer_atom **
1817getPeerCandidates( Torrent * t, int * setmeSize )
1818{
1819    int i, atomCount, retCount;
1820    struct peer_atom ** atoms;
1821    struct peer_atom ** ret;
1822    const time_t now = time( NULL );
1823    const int seed = tr_torrentIsSeed( t->tor );
1824
1825    assert( torrentIsLocked( t ) );
1826
1827    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1828    ret = tr_new( struct peer_atom*, atomCount );
1829    for( i=retCount=0; i<atomCount; ++i )
1830    {
1831        struct peer_atom * atom = atoms[i];
1832
1833        /* peer fed us too much bad data ... we only keep it around
1834         * now to weed it out in case someone sends it to us via pex */
1835        if( atom->myflags & MYFLAG_BANNED )
1836            continue;
1837
1838        /* peer was unconnectable before, so we're not going to keep trying.
1839         * this is needs a separate flag from `banned', since if they try
1840         * to connect to us later, we'll let them in */
1841        if( atom->myflags & MYFLAG_UNREACHABLE )
1842            continue;
1843
1844        /* we don't need two connections to the same peer... */
1845        if( peerIsInUse( t, &atom->addr ) )
1846            continue;
1847
1848        /* no need to connect if we're both seeds... */
1849        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1850            continue;
1851
1852        /* If we were connected to this peer recently and transferring
1853         * piece data, try to reconnect -- network troubles may have
1854         * disconnected us.  but if we weren't sharing piece data,
1855         * hold off on this peer to give another one a try instead */
1856        if( ( now - atom->piece_data_time ) > 10 )
1857        {
1858            const int wait = getReconnectIntervalSecs( atom );
1859            if( ( now - atom->time ) < wait ) {
1860                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1861                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1862                continue;
1863            }
1864        }
1865
1866        /* Don't connect to peers in our blocklist */
1867        if( tr_sessionIsAddressBlocked( t->manager->handle, &atom->addr ) )
1868            continue;
1869
1870        ret[retCount++] = atom;
1871    }
1872
1873    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1874    *setmeSize = retCount;
1875    return ret;
1876}
1877
1878static int
1879reconnectPulse( void * vtorrent )
1880{
1881    Torrent * t = vtorrent;
1882    static time_t prevTime = 0;
1883    static int newConnectionsThisSecond = 0;
1884    time_t now;
1885
1886    torrentLock( t );
1887
1888    now = time( NULL );
1889    if( prevTime != now )
1890    {
1891        prevTime = now;
1892        newConnectionsThisSecond = 0;
1893    }
1894
1895    if( !t->isRunning )
1896    {
1897        removeAllPeers( t );
1898    }
1899    else
1900    {
1901        int i, nCandidates, nBad;
1902        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1903        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1904
1905        if( nBad || nCandidates )
1906            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1907                       "%d connection candidates, %d atoms, max per pulse is %d",
1908                       t->tor->info.name, nBad, nCandidates,
1909                       tr_ptrArraySize(t->pool),
1910                       (int)MAX_RECONNECTIONS_PER_PULSE );
1911
1912        /* disconnect some peers.
1913           if we transferred piece data, then they might be good peers,
1914           so reset their `numFails' weight to zero.  otherwise we connected
1915           to them fruitlessly, so mark it as another fail */
1916        for( i=0; i<nBad; ++i ) {
1917            tr_peer * peer = connections[i];
1918            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1919            if( peer->pieceDataActivityDate )
1920                atom->numFails = 0;
1921            else
1922                ++atom->numFails;
1923            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
1924            removePeer( t, peer );
1925        }
1926
1927        /* add some new ones */
1928        for( i=0;    i < nCandidates
1929                  && i < MAX_RECONNECTIONS_PER_PULSE
1930                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1931        {
1932            tr_peerMgr * mgr = t->manager;
1933            struct peer_atom * atom = candidates[i];
1934            tr_peerIo * io;
1935
1936            tordbg( t, "Starting an OUTGOING connection with %s",
1937                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1938
1939            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1940            if( io == NULL )
1941            {
1942                atom->myflags |= MYFLAG_UNREACHABLE;
1943            }
1944            else
1945            {
1946                tr_handshake * handshake = tr_handshakeNew( io,
1947                                                            mgr->handle->encryptionMode,
1948                                                            myHandshakeDoneCB,
1949                                                            mgr );
1950
1951                assert( tr_peerIoGetTorrentHash( io ) );
1952
1953                ++newConnectionsThisSecond;
1954
1955                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1956            }
1957
1958            atom->time = time( NULL );
1959        }
1960
1961        /* cleanup */
1962        tr_free( connections );
1963        tr_free( candidates );
1964    }
1965
1966    torrentUnlock( t );
1967    return TRUE;
1968}
Note: See TracBrowser for help on using the repository browser.