source: trunk/libtransmission/peer-mgr.c @ 6627

Last change on this file since 6627 was 6627, checked in by charles, 13 years ago

(libT) remove a little more dead code

  • Property svn:keywords set to Date Rev Author Id
File size: 55.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6627 2008-08-22 15:33:55Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = (60 * 3),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = (60 * 10),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = (2 * 1000),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62     * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_ptrArray * webseeds; /* tr_webseed */
107    tr_timer * reconnectTimer;
108    tr_timer * rechokeTimer;
109    tr_timer * refillTimer;
110    tr_torrent * tor;
111    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
112    tr_bitfield * requestedPieces;
113
114    unsigned int isRunning : 1;
115
116    struct tr_peerMgr * manager;
117}
118Torrent;
119
120struct tr_peerMgr
121{
122    tr_handle * handle;
123    tr_ptrArray * torrents; /* Torrent */
124    tr_ptrArray * incomingHandshakes; /* tr_handshake */
125};
126
127#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
128
129/**
130***
131**/
132
133/* The following is not a very high quality random number generator.  It
134 * is mainly used as a simple stupid random number generator inside some
135 * functions below. Please don't use it for anything else unless you
136 * know what you're doing. Use tr_cryptoRandInt() instead.
137 */
138
139static int
140tr_stupidRandInt( int sup )
141{
142    static int init = 0;
143    assert( sup > 0 );
144
145    if( !init )
146    {
147        srand( tr_date() );
148        init = 1;
149    }
150
151    return rand() % sup;
152}
153
154static void
155managerLock( const struct tr_peerMgr * manager )
156{
157    tr_globalLock( manager->handle );
158}
159static void
160managerUnlock( const struct tr_peerMgr * manager )
161{
162    tr_globalUnlock( manager->handle );
163}
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169static void
170torrentUnlock( Torrent * torrent )
171{
172    managerUnlock( torrent->manager );
173}
174static int
175torrentIsLocked( const Torrent * t )
176{
177    return tr_globalIsLocked( t->manager->handle );
178}
179
180/**
181***
182**/
183
184static int
185compareAddresses( const struct in_addr * a, const struct in_addr * b )
186{
187    if( a->s_addr != b->s_addr )
188        return a->s_addr < b->s_addr ? -1 : 1;
189
190    return 0;
191}
192
193static int
194handshakeCompareToAddr( const void * va, const void * vb )
195{
196    const tr_handshake * a = va;
197    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
198}
199
200static int
201handshakeCompare( const void * a, const void * b )
202{
203    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
204}
205
206static tr_handshake*
207getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
208{
209    return tr_ptrArrayFindSorted( handshakes,
210                                  in_addr,
211                                  handshakeCompareToAddr );
212}
213
214static int
215comparePeerAtomToAddress( const void * va, const void * vb )
216{
217    const struct peer_atom * a = va;
218    return compareAddresses( &a->addr, vb );
219}
220
221static int
222comparePeerAtoms( const void * va, const void * vb )
223{
224    const struct peer_atom * b = vb;
225    return comparePeerAtomToAddress( va, &b->addr );
226}
227
228/**
229***
230**/
231
232static int
233torrentCompare( const void * va, const void * vb )
234{
235    const Torrent * a = va;
236    const Torrent * b = vb;
237    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
238}
239
240static int
241torrentCompareToHash( const void * va, const void * vb )
242{
243    const Torrent * a = va;
244    const uint8_t * b_hash = vb;
245    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
246}
247
248static Torrent*
249getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
250{
251    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
252                                             hash,
253                                             torrentCompareToHash );
254}
255
256static int
257peerCompare( const void * va, const void * vb )
258{
259    const tr_peer * a = va;
260    const tr_peer * b = vb;
261    return compareAddresses( &a->in_addr, &b->in_addr );
262}
263
264static int
265peerCompareToAddr( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    return compareAddresses( &a->in_addr, vb );
269}
270
271static tr_peer*
272getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    assert( torrentIsLocked( torrent ) );
275    assert( in_addr );
276
277    return tr_ptrArrayFindSorted( torrent->peers,
278                                  in_addr,
279                                  peerCompareToAddr );
280}
281
282static struct peer_atom*
283getExistingAtom( const Torrent * t, const struct in_addr * addr )
284{
285    assert( torrentIsLocked( t ) );
286    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
287}
288
289static int
290peerIsInUse( const Torrent * ct, const struct in_addr * addr )
291{
292    Torrent * t = (Torrent*) ct;
293
294    assert( torrentIsLocked ( t ) );
295
296    return getExistingPeer( t, addr )
297        || getExistingHandshake( t->outgoingHandshakes, addr )
298        || getExistingHandshake( t->manager->incomingHandshakes, addr );
299}
300
301static tr_peer*
302peerConstructor( const struct in_addr * in_addr )
303{
304    tr_peer * p;
305    p = tr_new0( tr_peer, 1 );
306    p->rcToClient = tr_rcInit( );
307    p->rcToPeer = tr_rcInit( );
308    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
309    return p;
310}
311
312static tr_peer*
313getPeer( Torrent * torrent, const struct in_addr * in_addr )
314{
315    tr_peer * peer;
316
317    assert( torrentIsLocked( torrent ) );
318
319    peer = getExistingPeer( torrent, in_addr );
320
321    if( peer == NULL ) {
322        peer = peerConstructor( in_addr );
323        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
324    }
325
326    return peer;
327}
328
329static void
330peerDestructor( tr_peer * peer )
331{
332    assert( peer );
333    assert( peer->msgs );
334
335    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
336    tr_peerMsgsFree( peer->msgs );
337
338    tr_peerIoFree( peer->io );
339
340    tr_bitfieldFree( peer->have );
341    tr_bitfieldFree( peer->blame );
342    tr_rcClose( peer->rcToClient );
343    tr_rcClose( peer->rcToPeer );
344    tr_free( peer->client );
345    tr_free( peer );
346}
347
348static void
349removePeer( Torrent * t, tr_peer * peer )
350{
351    tr_peer * removed;
352    struct peer_atom * atom;
353
354    assert( torrentIsLocked( t ) );
355
356    atom = getExistingAtom( t, &peer->in_addr );
357    assert( atom );
358    atom->time = time( NULL );
359
360    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
361    assert( removed == peer );
362    peerDestructor( removed );
363}
364
365static void
366removeAllPeers( Torrent * t )
367{
368    while( !tr_ptrArrayEmpty( t->peers ) )
369        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
370}
371
372static void
373torrentDestructor( Torrent * t )
374{
375    uint8_t hash[SHA_DIGEST_LENGTH];
376
377    assert( t );
378    assert( !t->isRunning );
379    assert( t->peers );
380    assert( torrentIsLocked( t ) );
381    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
382    assert( tr_ptrArrayEmpty( t->peers ) );
383
384    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
385
386    tr_timerFree( &t->reconnectTimer );
387    tr_timerFree( &t->rechokeTimer );
388    tr_timerFree( &t->refillTimer );
389
390    tr_bitfieldFree( t->requestedPieces );
391    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
392    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
393    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
394    tr_ptrArrayFree( t->peers, NULL );
395
396    tr_free( t );
397}
398
399static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
400
401static Torrent*
402torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
403{
404    int i;
405    Torrent * t;
406
407    t = tr_new0( Torrent, 1 );
408    t->manager = manager;
409    t->tor = tor;
410    t->pool = tr_ptrArrayNew( );
411    t->peers = tr_ptrArrayNew( );
412    t->webseeds = tr_ptrArrayNew( );
413    t->outgoingHandshakes = tr_ptrArrayNew( );
414    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
415    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
416
417    for( i=0; i<tor->info.webseedCount; ++i ) {
418        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
419        tr_ptrArrayAppend( t->webseeds, w );
420    }
421
422    return t;
423}
424
425/**
426 * For explanation, see http://www.bittorrent.org/fast_extensions.html
427 * Also see the "test-allowed-set" unit test
428 */
429struct tr_bitfield *
430tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
431                              const uint32_t         sz,        /* number of pieces in torrent */
432                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
433                              const struct in_addr * ip )       /* peer's address */
434{
435    uint8_t w[SHA_DIGEST_LENGTH + 4];
436    uint8_t x[SHA_DIGEST_LENGTH];
437    tr_bitfield * a;
438    uint32_t a_size;
439
440    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
441    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
442    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
443
444    a = tr_bitfieldNew( sz );
445    a_size = 0;
446   
447    while( a_size < k )
448    {
449        int i;
450        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
451        {
452            uint32_t j = i * 4;                                /* (5) */
453            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
454            uint32_t index = y % sz;                           /* (7) */
455            if( !tr_bitfieldHas( a, index ) ) {                /* (8) */
456                tr_bitfieldAdd( a, index );                    /* (9) */
457                ++a_size;
458            }
459        }
460        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
461    }
462   
463    return a;
464}
465
466tr_peerMgr*
467tr_peerMgrNew( tr_handle * handle )
468{
469    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
470    m->handle = handle;
471    m->torrents = tr_ptrArrayNew( );
472    m->incomingHandshakes = tr_ptrArrayNew( );
473    return m;
474}
475
476void
477tr_peerMgrFree( tr_peerMgr * manager )
478{
479    managerLock( manager );
480
481    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
482     * the item from manager->handshakes, so this is a little roundabout... */
483    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
484        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
485    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
486
487    /* free the torrents. */
488    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
489
490    managerUnlock( manager );
491    tr_free( manager );
492}
493
494static tr_peer**
495getConnectedPeers( Torrent * t, int * setmeCount )
496{
497    int i, peerCount, connectionCount;
498    tr_peer **peers;
499    tr_peer **ret;
500
501    assert( torrentIsLocked( t ) );
502
503    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
504    ret = tr_new( tr_peer*, peerCount );
505
506    for( i=connectionCount=0; i<peerCount; ++i )
507        if( peers[i]->msgs )
508            ret[connectionCount++] = peers[i];
509
510    *setmeCount = connectionCount;
511    return ret;
512}
513
514static int
515clientIsDownloadingFrom( const tr_peer * peer )
516{
517    return peer->clientIsInterested && !peer->clientIsChoked;
518}
519
520static int
521clientIsUploadingTo( const tr_peer * peer )
522{
523    return peer->peerIsInterested && !peer->peerIsChoked;
524}
525
526/***
527****
528***/
529
530int
531tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
532                      const uint8_t          * torrentHash,
533                      const struct in_addr   * addr )
534{
535    int isSeed = FALSE;
536    const Torrent * t = NULL;
537    const struct peer_atom * atom = NULL;
538
539    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
540    if( t )
541        atom = getExistingAtom( t, addr );
542    if( atom )
543        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
544
545    return isSeed;
546}
547
548/***
549****  Refill
550***/
551
552struct tr_refill_piece
553{
554    tr_priority_t priority;
555    int missingBlockCount;
556    int random;
557    uint32_t piece;
558    uint32_t peerCount;
559};
560
561static int
562compareRefillPiece (const void * aIn, const void * bIn)
563{
564    const struct tr_refill_piece * a = aIn;
565    const struct tr_refill_piece * b = bIn;
566
567    /* if one piece has a higher priority, it goes first */
568    if( a->priority != b->priority )
569        return a->priority > b->priority ? -1 : 1;
570
571    /* otherwise if one has fewer peers, it goes first */
572    if( a->peerCount != b->peerCount )
573        return a->peerCount < b->peerCount ? -1 : 1;
574
575    /* otherwise go with our random seed */
576    if( a->random != b->random )
577        return a->random < b->random ? -1 : 1;
578
579    return 0;
580}
581
582static int
583isPieceInteresting( const tr_torrent  * tor,
584                    tr_piece_index_t    piece )
585{
586    if( tor->info.pieces[piece].dnd ) /* we don't want it */
587        return 0;
588
589    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
590        return 0;
591
592    return 1;
593}
594
595static uint32_t*
596getPreferredPieces( Torrent     * t,
597                    uint32_t    * pieceCount )
598{
599    const tr_torrent * tor = t->tor;
600    const tr_info * inf = &tor->info;
601    tr_piece_index_t i;
602    uint32_t poolSize = 0;
603    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
604    int peerCount;
605    tr_peer** peers;
606
607    assert( torrentIsLocked( t ) );
608
609    peers = getConnectedPeers( t, &peerCount );
610
611    for( i=0; i<inf->pieceCount; ++i )
612        if( isPieceInteresting( tor, i ) )
613            pool[poolSize++] = i;
614
615    /* sort the pool from most interesting to least... */
616    if( poolSize > 1 )
617    {
618        uint32_t j;
619        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
620
621        for( j=0; j<poolSize; ++j )
622        {
623            int k;
624            const tr_piece_index_t piece = pool[j];
625            struct tr_refill_piece * setme = p + j;
626
627            setme->piece = piece;
628            setme->priority = inf->pieces[piece].priority;
629            setme->peerCount = 0;
630            setme->random = tr_stupidRandInt( INT_MAX );
631            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
632
633            for( k=0; k<peerCount; ++k ) {
634                const tr_peer * peer = peers[k];
635                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
636                    ++setme->peerCount;
637            }
638        }
639
640        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
641
642        for( j=0; j<poolSize; ++j )
643            pool[j] = p[j].piece;
644
645        tr_free( p );
646    }
647
648    tr_free( peers );
649
650    *pieceCount = poolSize;
651    return pool;
652}
653
654static tr_peer**
655getPeersUploadingToClient( Torrent * t, int * setmeCount )
656{
657    int i;
658    int peerCount = 0;
659    int retCount = 0;
660    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
661    tr_peer ** ret = tr_new( tr_peer*, peerCount );
662
663    /* get a list of peers we're downloading from */
664    for( i=0; i<peerCount; ++i )
665        if( clientIsDownloadingFrom( peers[i] ) )
666            ret[retCount++] = peers[i];
667
668    /* pick a different starting point each time so all peers
669     * get a chance at the first blocks in the queue */
670    if( retCount ) {
671        tr_peer ** tmp = tr_new( tr_peer*, retCount );
672        i = tr_stupidRandInt( retCount );
673        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
674        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
675        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
676        tr_free( tmp );
677    }
678
679    *setmeCount = retCount;
680    return ret;
681}
682
683static int
684refillPulse( void * vtorrent )
685{
686    Torrent * t = vtorrent;
687    tr_torrent * tor = t->tor;
688    int peerCount;
689    int webseedCount;
690    tr_peer ** peers;
691    tr_webseed ** webseeds;
692    uint32_t pieceCount;
693    uint32_t * pieces;
694    tr_piece_index_t i;
695
696    if( !t->isRunning )
697        return TRUE;
698    if( tr_torrentIsSeed( t->tor ) )
699        return TRUE;
700
701    torrentLock( t );
702    tordbg( t, "Refilling Request Buffers..." );
703
704    pieces = getPreferredPieces( t, &pieceCount );
705    peers = getPeersUploadingToClient( t, &peerCount );
706    webseedCount = tr_ptrArraySize( t->webseeds );
707    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
708
709    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
710    {
711        int j;
712        int handled = FALSE;
713        const tr_piece_index_t piece = pieces[i];
714
715        assert( piece < tor->info.pieceCount );
716
717        /* find a peer who can ask for this piece */
718        for( j=0; !handled && j<peerCount; )
719        {
720            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
721            switch( val )
722            {
723                case TR_ADDREQ_FULL: 
724                case TR_ADDREQ_CLIENT_CHOKED:
725                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
726                    break;
727                case TR_ADDREQ_MISSING: 
728                case TR_ADDREQ_DUPLICATE: 
729                    ++j;
730                    break;
731                case TR_ADDREQ_OK:
732                    tr_bitfieldAdd( t->requestedPieces, piece );
733                    handled = TRUE;
734                    break;
735                default:
736                    assert( 0 && "unhandled value" );
737                    break;
738            }
739        }
740
741        /* maybe one of the webseeds can do it */
742        for( j=0; !handled && j<webseedCount; )
743        {
744            const int val = tr_webseedAddRequest( webseeds[j], piece );
745            switch( val )
746            {
747                case TR_ADDREQ_FULL: 
748                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
749                    break;
750                case TR_ADDREQ_OK:
751                    tr_bitfieldAdd( t->requestedPieces, piece );
752                    handled = TRUE;
753                    break;
754                default:
755                    assert( 0 && "unhandled value" );
756                    break;
757            }
758        }
759    }
760
761    /* cleanup */
762    tr_free( webseeds );
763    tr_free( peers );
764    tr_free( pieces );
765
766    t->refillTimer = NULL;
767    torrentUnlock( t );
768    return FALSE;
769}
770
771static void
772addStrike( Torrent * t, tr_peer * peer )
773{
774    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
775
776    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
777    {
778        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
779        atom->myflags |= MYFLAG_BANNED;
780        peer->doPurge = 1;
781        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
782    }
783}
784
785static void
786gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
787{
788    tr_torrent * tor = t->tor;
789    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
790    tor->corruptCur += byteCount;
791    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
792}
793
794static void
795refillSoon( Torrent * t )
796{
797    if( t->refillTimer == NULL )
798        t->refillTimer = tr_timerNew( t->manager->handle,
799                                      refillPulse, t,
800                                      REFILL_PERIOD_MSEC );
801}
802
803static void
804peerCallbackFunc( void * vpeer, void * vevent, void * vt )
805{
806    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
807    Torrent * t = (Torrent *) vt;
808    const tr_peer_event * e = vevent;
809
810    torrentLock( t );
811
812    switch( e->eventType )
813    {
814        case TR_PEER_NEED_REQ:
815            refillSoon( t );
816            break;
817
818        case TR_PEER_CANCEL:
819            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
820            break;
821
822        case TR_PEER_PEER_GOT_DATA: {
823            const time_t now = time( NULL );
824            tr_torrent * tor = t->tor;
825            tor->activityDate = now;
826            tor->uploadedCur += e->length;
827            tr_rcTransferred( tor->upload, e->length );
828            tr_rcTransferred( tor->handle->upload, e->length );
829            tr_statsAddUploaded( tor->handle, e->length );
830            if( peer ) {
831                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
832                atom->piece_data_time = time( NULL );
833            }
834            break;
835        }
836
837        case TR_PEER_CLIENT_GOT_DATA: {
838            const time_t now = time( NULL );
839            tr_torrent * tor = t->tor;
840            tor->activityDate = now;
841            /* only add this to downloadedCur if we got it from a peer --
842             * webseeds shouldn't count against our ratio.  As one tracker
843             * admin put it, "Those pieces are downloaded directly from the
844             * content distributor, not the peers, it is the tracker's job
845             * to manage the swarms, not the web server and does not fit
846             * into the jurisdiction of the tracker." */
847            if( peer )
848                tor->downloadedCur += e->length;
849            tr_rcTransferred( tor->download, e->length );
850            tr_rcTransferred( tor->handle->download, e->length );
851            tr_statsAddDownloaded( tor->handle, e->length );
852            if( peer ) {
853                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
854                atom->piece_data_time = time( NULL );
855            }
856            break;
857        }
858
859        case TR_PEER_PEER_PROGRESS: {
860            if( peer ) {
861                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
862                const int peerIsSeed = e->progress >= 1.0;
863                if( peerIsSeed ) {
864                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
865                    atom->flags |= ADDED_F_SEED_FLAG;
866                } else {
867                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
868                    atom->flags &= ~ADDED_F_SEED_FLAG;
869                }
870            }
871            break;
872        }
873
874        case TR_PEER_CLIENT_GOT_BLOCK:
875        {
876            tr_torrent * tor = t->tor;
877
878            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
879
880            tr_cpBlockAdd( tor->completion, block );
881
882            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
883            {
884                const tr_piece_index_t p = e->pieceIndex;
885                const tr_errno err = tr_ioTestPiece( tor, p );
886
887                if( err ) {
888                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
889                               (unsigned long)p, tr_errorString( err ) );
890                }
891
892                tr_torrentSetHasPiece( tor, p, !err );
893                tr_torrentSetPieceChecked( tor, p, TRUE );
894                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
895
896                if( err )
897                    gotBadPiece( t, p );
898                else {
899                    int i, peerCount;
900                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
901                    for( i=0; i<peerCount; ++i )
902                        tr_peerMsgsHave( peers[i]->msgs, p );
903                    tr_free( peers );
904                }
905
906                tr_torrentRecheckCompleteness( tor );
907            }
908            break;
909        }
910
911        case TR_PEER_ERROR:
912            if( TR_ERROR_IS_IO( e->err ) ) {
913                t->tor->error = e->err;
914                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
915                tr_torrentStop( t->tor );
916            } else if( e->err == TR_ERROR_ASSERT ) {
917                addStrike( t, peer );
918            }
919            peer->doPurge = 1;
920            break;
921
922        default:
923            assert(0);
924    }
925
926    torrentUnlock( t );
927}
928
929static void
930ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
931{
932    if( getExistingAtom( t, addr ) == NULL )
933    {
934        struct peer_atom * a;
935        a = tr_new0( struct peer_atom, 1 );
936        a->addr = *addr;
937        a->port = port;
938        a->flags = flags;
939        a->from = from;
940        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
941        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
942    }
943}
944
945static int
946getMaxPeerCount( const tr_torrent * tor UNUSED )
947{
948    return tor->maxConnectedPeers;
949}
950
951/* FIXME: this is kind of a mess. */
952static void
953myHandshakeDoneCB( tr_handshake    * handshake,
954                   tr_peerIo       * io,
955                   int               isConnected,
956                   const uint8_t   * peer_id,
957                   void            * vmanager )
958{
959    int ok = isConnected;
960    uint16_t port;
961    const struct in_addr * addr;
962    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
963    Torrent * t;
964    tr_handshake * ours;
965
966    assert( io );
967    assert( isConnected==0 || isConnected==1 );
968
969    t = tr_peerIoHasTorrentHash( io )
970        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
971        : NULL;
972
973    if( tr_peerIoIsIncoming ( io ) )
974        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
975                                        handshake, handshakeCompare );
976    else if( t )
977        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
978                                        handshake, handshakeCompare );
979    else
980        ours = handshake;
981
982    assert( ours );
983    assert( ours == handshake );
984
985    if( t )
986        torrentLock( t );
987
988    addr = tr_peerIoGetAddress( io, &port );
989
990    if( !ok || !t || !t->isRunning )
991    {
992        if( t ) {
993            struct peer_atom * atom = getExistingAtom( t, addr );
994            if( atom )
995                ++atom->numFails;
996        }
997
998        tr_peerIoFree( io );
999    }
1000    else /* looking good */
1001    {
1002        struct peer_atom * atom;
1003        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1004        atom = getExistingAtom( t, addr );
1005
1006        if( atom->myflags & MYFLAG_BANNED )
1007        {
1008            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1009            tr_peerIoFree( io );
1010        }
1011        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1012        {
1013            tr_peerIoFree( io );
1014        }
1015        else
1016        {
1017            tr_peer * peer = getExistingPeer( t, addr );
1018
1019            if( peer ) /* we already have this peer */
1020            {
1021                tr_peerIoFree( io );
1022            }
1023            else
1024            {
1025                peer = getPeer( t, addr );
1026                tr_free( peer->client );
1027
1028                if( !peer_id )
1029                    peer->client = NULL;
1030                else {
1031                    char client[128];
1032                    tr_clientForId( client, sizeof( client ), peer_id );
1033                    peer->client = tr_strdup( client );
1034                }
1035                peer->port = port;
1036                peer->io = io;
1037                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1038                atom->time = time( NULL );
1039            }
1040        }
1041    }
1042
1043    if( t )
1044        torrentUnlock( t );
1045}
1046
1047void
1048tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1049                       struct in_addr  * addr,
1050                       uint16_t          port,
1051                       int               socket )
1052{
1053    managerLock( manager );
1054
1055    if( tr_sessionIsAddressBlocked( manager->handle, addr ) )
1056    {
1057        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1058                inet_ntoa( *addr ) );
1059        tr_netClose( socket );
1060    }
1061    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1062    {
1063        tr_netClose( socket );
1064    }
1065    else /* we don't have a connetion to them yet... */
1066    {
1067        tr_peerIo * io;
1068        tr_handshake * handshake;
1069
1070        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1071
1072        handshake = tr_handshakeNew( io,
1073                                     manager->handle->encryptionMode,
1074                                     myHandshakeDoneCB,
1075                                     manager );
1076
1077        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1078    }
1079
1080    managerUnlock( manager );
1081}
1082
1083void
1084tr_peerMgrAddPex( tr_peerMgr     * manager,
1085                  const uint8_t  * torrentHash,
1086                  uint8_t          from,
1087                  const tr_pex   * pex )
1088{
1089    Torrent * t;
1090    managerLock( manager );
1091
1092    t = getExistingTorrent( manager, torrentHash );
1093    if( !tr_sessionIsAddressBlocked( t->manager->handle, &pex->in_addr ) )
1094        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1095
1096    managerUnlock( manager );
1097}
1098
1099tr_pex *
1100tr_peerMgrCompactToPex( const void     * compact,
1101                        size_t           compactLen,
1102                        const uint8_t  * added_f,
1103                        size_t           added_f_len,
1104                        size_t         * pexCount )
1105{
1106    size_t i;
1107    size_t n = compactLen / 6;
1108    const uint8_t * walk = compact;
1109    tr_pex * pex = tr_new0( tr_pex, n );
1110
1111    for( i=0; i<n; ++i ) {
1112        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1113        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1114        if( added_f && ( n == added_f_len ) )
1115            pex[i].flags = added_f[i];
1116    }
1117
1118    *pexCount = n;
1119    return pex;
1120}
1121
1122/**
1123***
1124**/
1125
1126void
1127tr_peerMgrSetBlame( tr_peerMgr     * manager,
1128                    const uint8_t  * torrentHash,
1129                    tr_piece_index_t pieceIndex,
1130                    int              success )
1131{
1132    if( !success )
1133    {
1134        int peerCount, i;
1135        Torrent * t = getExistingTorrent( manager, torrentHash );
1136        tr_peer ** peers;
1137
1138        assert( torrentIsLocked( t ) );
1139
1140        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1141        for( i=0; i<peerCount; ++i )
1142        {
1143            tr_peer * peer = peers[i];
1144            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1145            {
1146                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1147                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1148                           pieceIndex, (int)peer->strikes+1 );
1149                addStrike( t, peer );
1150            }
1151        }
1152    }
1153}
1154
1155int
1156tr_pexCompare( const void * va, const void * vb )
1157{
1158    const tr_pex * a = va;
1159    const tr_pex * b = vb;
1160    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1161    if( i ) return i;
1162    if( a->port < b->port ) return -1;
1163    if( a->port > b->port ) return 1;
1164    return 0;
1165}
1166
1167int tr_pexCompare( const void * a, const void * b );
1168
1169static int
1170peerPrefersCrypto( const tr_peer * peer )
1171{
1172    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1173        return TRUE;
1174
1175    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1176        return FALSE;
1177
1178    return tr_peerIoIsEncrypted( peer->io );
1179};
1180
1181int
1182tr_peerMgrGetPeers( tr_peerMgr      * manager,
1183                    const uint8_t   * torrentHash,
1184                    tr_pex         ** setme_pex )
1185{
1186    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1187    int i, peerCount;
1188    const tr_peer ** peers;
1189    tr_pex * pex;
1190    tr_pex * walk;
1191
1192    managerLock( manager );
1193
1194    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1195    pex = walk = tr_new( tr_pex, peerCount );
1196
1197    for( i=0; i<peerCount; ++i, ++walk )
1198    {
1199        const tr_peer * peer = peers[i];
1200
1201        walk->in_addr = peer->in_addr;
1202
1203        walk->port = peer->port;
1204
1205        walk->flags = 0;
1206        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1207        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1208    }
1209
1210    assert( ( walk - pex ) == peerCount );
1211    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1212    *setme_pex = pex;
1213
1214    managerUnlock( manager );
1215
1216    return peerCount;
1217}
1218
1219static int reconnectPulse( void * vtorrent );
1220static int rechokePulse( void * vtorrent );
1221
1222void
1223tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1224                        const uint8_t  * torrentHash )
1225{
1226    Torrent * t;
1227
1228    managerLock( manager );
1229
1230    t = getExistingTorrent( manager, torrentHash );
1231
1232    assert( t );
1233    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1234    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1235
1236    if( !t->isRunning )
1237    {
1238        t->isRunning = 1;
1239
1240        t->reconnectTimer = tr_timerNew( t->manager->handle,
1241                                         reconnectPulse, t,
1242                                         RECONNECT_PERIOD_MSEC );
1243
1244        t->rechokeTimer = tr_timerNew( t->manager->handle,
1245                                       rechokePulse, t,
1246                                       RECHOKE_PERIOD_MSEC );
1247
1248        reconnectPulse( t );
1249
1250        rechokePulse( t );
1251
1252        if( !tr_ptrArrayEmpty( t->webseeds ) )
1253            refillSoon( t );
1254    }
1255
1256    managerUnlock( manager );
1257}
1258
1259static void
1260stopTorrent( Torrent * t )
1261{
1262    assert( torrentIsLocked( t ) );
1263
1264    t->isRunning = 0;
1265    tr_timerFree( &t->rechokeTimer );
1266    tr_timerFree( &t->reconnectTimer );
1267
1268    /* disconnect the peers. */
1269    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1270    tr_ptrArrayClear( t->peers );
1271
1272    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1273     * which removes the handshake from t->outgoingHandshakes... */
1274    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1275        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1276}
1277void
1278tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1279                       const uint8_t  * torrentHash)
1280{
1281    managerLock( manager );
1282
1283    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1284
1285    managerUnlock( manager );
1286}
1287
1288void
1289tr_peerMgrAddTorrent( tr_peerMgr * manager,
1290                      tr_torrent * tor )
1291{
1292    Torrent * t;
1293
1294    managerLock( manager );
1295
1296    assert( tor );
1297    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1298
1299    t = torrentConstructor( manager, tor );
1300    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1301
1302    managerUnlock( manager );
1303}
1304
1305void
1306tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1307                         const uint8_t  * torrentHash )
1308{
1309    Torrent * t;
1310
1311    managerLock( manager );
1312
1313    t = getExistingTorrent( manager, torrentHash );
1314    assert( t );
1315    stopTorrent( t );
1316    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1317    torrentDestructor( t );
1318
1319    managerUnlock( manager );
1320}
1321
1322void
1323tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1324                               const uint8_t    * torrentHash,
1325                               int8_t           * tab,
1326                               unsigned int       tabCount )
1327{
1328    tr_piece_index_t i;
1329    const Torrent * t;
1330    const tr_torrent * tor;
1331    float interval;
1332    int isComplete;
1333    int peerCount;
1334    const tr_peer ** peers;
1335
1336    managerLock( manager );
1337
1338    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1339    tor = t->tor;
1340    interval = tor->info.pieceCount / (float)tabCount;
1341    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1342    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1343
1344    memset( tab, 0, tabCount );
1345
1346    for( i=0; tor && i<tabCount; ++i )
1347    {
1348        const int piece = i * interval;
1349
1350        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1351            tab[i] = -1;
1352        else if( peerCount ) {
1353            int j;
1354            for( j=0; j<peerCount; ++j )
1355                if( tr_bitfieldHas( peers[j]->have, i ) )
1356                    ++tab[i];
1357        }
1358    }
1359
1360    managerUnlock( manager );
1361}
1362
1363/* Returns the pieces that are available from peers */
1364tr_bitfield*
1365tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1366                        const uint8_t    * torrentHash )
1367{
1368    int i, size;
1369    Torrent * t;
1370    tr_peer ** peers;
1371    tr_bitfield * pieces;
1372    managerLock( manager );
1373
1374    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1375    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1376    peers = getConnectedPeers( t, &size );
1377    for( i=0; i<size; ++i )
1378        tr_bitfieldOr( pieces, peers[i]->have );
1379
1380    managerUnlock( manager );
1381    tr_free( peers );
1382    return pieces;
1383}
1384
1385int
1386tr_peerMgrHasConnections( const tr_peerMgr * manager,
1387                          const uint8_t    * torrentHash )
1388{
1389    int ret;
1390    const Torrent * t;
1391    managerLock( manager );
1392
1393    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1394    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1395
1396    managerUnlock( manager );
1397    return ret;
1398}
1399
1400void
1401tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1402                        const uint8_t    * torrentHash,
1403                        int              * setmePeersKnown,
1404                        int              * setmePeersConnected,
1405                        int              * setmeSeedsConnected,
1406                        int              * setmeWebseedsSendingToUs,
1407                        int              * setmePeersSendingToUs,
1408                        int              * setmePeersGettingFromUs,
1409                        int              * setmePeersFrom )
1410{
1411    int i, size;
1412    const Torrent * t;
1413    const tr_peer ** peers;
1414    const tr_webseed ** webseeds;
1415
1416    managerLock( manager );
1417
1418    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1419    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1420
1421    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1422    *setmePeersConnected       = 0;
1423    *setmeSeedsConnected       = 0;
1424    *setmePeersGettingFromUs   = 0;
1425    *setmePeersSendingToUs     = 0;
1426    *setmeWebseedsSendingToUs  = 0;
1427
1428    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1429        setmePeersFrom[i] = 0;
1430
1431    for( i=0; i<size; ++i )
1432    {
1433        const tr_peer * peer = peers[i];
1434        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1435
1436        if( peer->io == NULL ) /* not connected */
1437            continue;
1438
1439        ++*setmePeersConnected;
1440
1441        ++setmePeersFrom[atom->from];
1442
1443        if( clientIsDownloadingFrom( peer ) )
1444            ++*setmePeersSendingToUs;
1445
1446        if( clientIsUploadingTo( peer ) )
1447            ++*setmePeersGettingFromUs;
1448
1449        if( atom->flags & ADDED_F_SEED_FLAG )
1450            ++*setmeSeedsConnected;
1451    }
1452
1453    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1454    for( i=0; i<size; ++i )
1455    {
1456        if( tr_webseedIsActive( webseeds[i] ) )
1457            ++*setmeWebseedsSendingToUs;
1458    }
1459
1460    managerUnlock( manager );
1461}
1462
1463float*
1464tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1465                     const uint8_t    * torrentHash )
1466{
1467    const Torrent * t;
1468    const tr_webseed ** webseeds;
1469    int i;
1470    int webseedCount;
1471    float * ret;
1472
1473    assert( manager );
1474    managerLock( manager );
1475
1476    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1477    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds, &webseedCount );
1478    assert( webseedCount == t->tor->info.webseedCount );
1479    ret = tr_new0( float, webseedCount );
1480
1481    for( i=0; i<webseedCount; ++i )
1482        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1483            ret[i] = -1.0;
1484
1485    managerUnlock( manager );
1486    return ret;
1487}
1488
1489struct tr_peer_stat *
1490tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1491                     const uint8_t     * torrentHash,
1492                     int               * setmeCount UNUSED )
1493{
1494    int i, size;
1495    const Torrent * t;
1496    tr_peer ** peers;
1497    tr_peer_stat * ret;
1498
1499    assert( manager );
1500    managerLock( manager );
1501
1502    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1503    peers = getConnectedPeers( (Torrent*)t, &size );
1504    ret = tr_new0( tr_peer_stat, size );
1505
1506    for( i=0; i<size; ++i )
1507    {
1508        char * pch;
1509        const tr_peer * peer = peers[i];
1510        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1511        tr_peer_stat * stat = ret + i;
1512
1513        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1514        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1515        stat->port               = peer->port;
1516        stat->from               = atom->from;
1517        stat->progress           = peer->progress;
1518        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1519        stat->uploadToRate       = peer->rateToPeer;
1520        stat->downloadFromRate   = peer->rateToClient;
1521        stat->peerIsChoked       = peer->peerIsChoked;
1522        stat->peerIsInterested   = peer->peerIsInterested;
1523        stat->clientIsChoked     = peer->clientIsChoked;
1524        stat->clientIsInterested = peer->clientIsInterested;
1525        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1526        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1527        stat->isUploadingTo      = clientIsUploadingTo( peer );
1528
1529        pch = stat->flagStr;
1530        if( t->optimistic == peer ) *pch++ = 'O';
1531        if( stat->isDownloadingFrom ) *pch++ = 'D';
1532        else if( stat->clientIsInterested ) *pch++ = 'd';
1533        if( stat->isUploadingTo ) *pch++ = 'U';
1534        else if( stat->peerIsInterested ) *pch++ = 'u';
1535        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1536        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1537        if( stat->isEncrypted ) *pch++ = 'E';
1538        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1539        if( stat->isIncoming ) *pch++ = 'I';
1540        *pch = '\0';
1541    }
1542
1543    *setmeCount = size;
1544    tr_free( peers );
1545
1546    managerUnlock( manager );
1547    return ret;
1548}
1549
1550/**
1551***
1552**/
1553
1554struct ChokeData
1555{
1556    unsigned int  doUnchoke     : 1;
1557    unsigned int  isInterested  : 1;
1558    tr_peer * peer;
1559};
1560
1561static int
1562tr_compareDouble( double a, double b )
1563{
1564    if( a < b ) return -1;
1565    if( a > b ) return 1;
1566    return 0;
1567}
1568
1569static int
1570compareChoke( const void * va, const void * vb )
1571{
1572    const struct ChokeData * a = va;
1573    const struct ChokeData * b = vb;
1574    int diff = 0;
1575
1576    if( diff == 0 ) /* prefer higher dl speeds */
1577        diff = -tr_compareDouble( a->peer->rateToClient, b->peer->rateToClient );
1578    if( diff == 0 ) /* prefer higher ul speeds */
1579        diff = -tr_compareDouble( a->peer->rateToPeer, b->peer->rateToPeer );
1580    if( diff == 0 ) /* prefer unchoked */
1581        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1582
1583    return diff;
1584}
1585
1586static int
1587isNew( const tr_peer * peer )
1588{
1589    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1590}
1591
1592static int
1593isSame( const tr_peer * peer )
1594{
1595    return peer && peer->client && strstr( peer->client, "Transmission" );
1596}
1597
1598/**
1599***
1600**/
1601
1602static void
1603rechoke( Torrent * t )
1604{
1605    int i, peerCount, size, unchokedInterested;
1606    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1607    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1608
1609    assert( torrentIsLocked( t ) );
1610   
1611    /* sort the peers by preference and rate */
1612    for( i=0, size=0; i<peerCount; ++i )
1613    {
1614        tr_peer * peer = peers[i];
1615        if( peer->progress >= 1.0 ) /* choke all seeds */
1616            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1617        else {
1618            struct ChokeData * node = &choke[size++];
1619            node->peer = peer;
1620            node->isInterested = peer->peerIsInterested;
1621        }
1622    }
1623
1624    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1625
1626    /**
1627     * Reciprocation and number of uploads capping is managed by unchoking
1628     * the N peers which have the best upload rate and are interested.
1629     * This maximizes the client's download rate. These N peers are
1630     * referred to as downloaders, because they are interested in downloading
1631     * from the client.
1632     *
1633     * Peers which have a better upload rate (as compared to the downloaders)
1634     * but aren't interested get unchoked. If they become interested, the
1635     * downloader with the worst upload rate gets choked. If a client has
1636     * a complete file, it uses its upload rate rather than its download
1637     * rate to decide which peers to unchoke.
1638     */
1639    unchokedInterested = 0;
1640    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1641        choke[i].doUnchoke = 1;
1642        if( choke[i].isInterested )
1643            ++unchokedInterested;
1644    }
1645
1646    /* optimistic unchoke */
1647    if( i < size )
1648    {
1649        int n;
1650        struct ChokeData * c;
1651        tr_ptrArray * randPool = tr_ptrArrayNew( );
1652
1653        for( ; i<size; ++i )
1654        {
1655            if( choke[i].isInterested )
1656            {
1657                const tr_peer * peer = choke[i].peer;
1658                int x=1, y;
1659                if( isNew( peer ) ) x *= 3;
1660                if( isSame( peer ) ) x *= 3;
1661                for( y=0; y<x; ++y )
1662                    tr_ptrArrayAppend( randPool, &choke[i] );
1663            }
1664        }
1665
1666        if(( n = tr_ptrArraySize( randPool )))
1667        {
1668            c = tr_ptrArrayNth( randPool, tr_stupidRandInt( n ));
1669            c->doUnchoke = 1;
1670            t->optimistic = c->peer;
1671        }
1672
1673        tr_ptrArrayFree( randPool, NULL );
1674    }
1675
1676    for( i=0; i<size; ++i )
1677        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1678
1679    /* cleanup */
1680    tr_free( choke );
1681    tr_free( peers );
1682}
1683
1684static int
1685rechokePulse( void * vtorrent )
1686{
1687    Torrent * t = vtorrent;
1688    torrentLock( t );
1689    rechoke( t );
1690    torrentUnlock( t );
1691    return TRUE;
1692}
1693
1694/***
1695****
1696****  Life and Death
1697****
1698***/
1699
1700static int
1701shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1702{
1703    const tr_torrent * tor = t->tor;
1704    const time_t now = time( NULL );
1705    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1706
1707    /* if it's marked for purging, close it */
1708    if( peer->doPurge ) {
1709        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1710        return TRUE;
1711    }
1712
1713    /* if we're seeding and the peer has everything we have,
1714     * and enough time has passed for a pex exchange, then disconnect */
1715    if( tr_torrentIsSeed( tor ) ) {
1716        int peerHasEverything;
1717        if( atom->flags & ADDED_F_SEED_FLAG )
1718            peerHasEverything = TRUE;
1719        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1720            peerHasEverything = FALSE;
1721        else {
1722            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1723            tr_bitfieldDifference( tmp, peer->have );
1724            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1725            tr_bitfieldFree( tmp );
1726        }
1727        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1728            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1729            return TRUE;
1730        }
1731    }
1732
1733    /* disconnect if it's been too long since piece data has been transferred.
1734     * this is on a sliding scale based on number of available peers... */
1735    {
1736        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1737        /* if we have >= relaxIfFewerThan, strictness is 100%.
1738         * if we have zero connections, strictness is 0% */
1739        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1740            ? 1.0
1741            : peerCount / (float)relaxStrictnessIfFewerThanN;
1742        const int lo = MIN_UPLOAD_IDLE_SECS;
1743        const int hi = MAX_UPLOAD_IDLE_SECS;
1744        const int limit = lo + ((hi-lo) * strictness);
1745        const time_t then = peer->pieceDataActivityDate;
1746        const int idleTime = then ? (now-then) : 0;
1747        if( idleTime > limit ) {
1748            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1749                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1750            return TRUE;
1751        }
1752    }
1753
1754    return FALSE;
1755}
1756
1757static tr_peer **
1758getPeersToClose( Torrent * t, int * setmeSize )
1759{
1760    int i, peerCount, outsize;
1761    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1762    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1763
1764    assert( torrentIsLocked( t ) );
1765
1766    for( i=outsize=0; i<peerCount; ++i )
1767        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1768            ret[outsize++] = peers[i];
1769
1770    *setmeSize = outsize;
1771    return ret;
1772}
1773
1774static int
1775compareCandidates( const void * va, const void * vb )
1776{
1777    const struct peer_atom * a = * (const struct peer_atom**) va;
1778    const struct peer_atom * b = * (const struct peer_atom**) vb;
1779
1780    /* <Charles> Here we would probably want to try reconnecting to
1781     * peers that had most recently given us data. Lots of users have
1782     * trouble with resets due to their routers and/or ISPs. This way we
1783     * can quickly recover from an unwanted reset. So we sort
1784     * piece_data_time in descending order.
1785     */
1786
1787    if( a->piece_data_time != b->piece_data_time )
1788        return a->piece_data_time < b->piece_data_time ? 1 : -1;
1789
1790    if( a->numFails != b->numFails )
1791        return a->numFails < b->numFails ? -1 : 1;
1792
1793    if( a->time != b->time )
1794        return a->time < b->time ? -1 : 1;
1795
1796    return 0;
1797}
1798
1799static int
1800getReconnectIntervalSecs( const struct peer_atom * atom )
1801{
1802    int sec;
1803
1804    switch( atom->numFails )
1805    {
1806        case 0: sec = 0; break;
1807        case 1: sec = 5; break;
1808        case 2: sec = 2*60; break;
1809        case 3: sec = 15*60; break;
1810        case 4: sec = 30*60; break;
1811        case 5: sec = 60*60; break;
1812        default: sec = 120*60; break;
1813    }
1814
1815    return sec;
1816}
1817
1818static struct peer_atom **
1819getPeerCandidates( Torrent * t, int * setmeSize )
1820{
1821    int i, atomCount, retCount;
1822    struct peer_atom ** atoms;
1823    struct peer_atom ** ret;
1824    const time_t now = time( NULL );
1825    const int seed = tr_torrentIsSeed( t->tor );
1826
1827    assert( torrentIsLocked( t ) );
1828
1829    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1830    ret = tr_new( struct peer_atom*, atomCount );
1831    for( i=retCount=0; i<atomCount; ++i )
1832    {
1833        struct peer_atom * atom = atoms[i];
1834
1835        /* peer fed us too much bad data ... we only keep it around
1836         * now to weed it out in case someone sends it to us via pex */
1837        if( atom->myflags & MYFLAG_BANNED )
1838            continue;
1839
1840        /* peer was unconnectable before, so we're not going to keep trying.
1841         * this is needs a separate flag from `banned', since if they try
1842         * to connect to us later, we'll let them in */
1843        if( atom->myflags & MYFLAG_UNREACHABLE )
1844            continue;
1845
1846        /* we don't need two connections to the same peer... */
1847        if( peerIsInUse( t, &atom->addr ) )
1848            continue;
1849
1850        /* no need to connect if we're both seeds... */
1851        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1852            continue;
1853
1854        /* If we were connected to this peer recently and transferring
1855         * piece data, try to reconnect -- network troubles may have
1856         * disconnected us.  but if we weren't sharing piece data,
1857         * hold off on this peer to give another one a try instead */
1858        if( ( now - atom->piece_data_time ) > 10 )
1859        {
1860            const int wait = getReconnectIntervalSecs( atom );
1861            if( ( now - atom->time ) < wait ) {
1862                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1863                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1864                continue;
1865            }
1866        }
1867
1868        /* Don't connect to peers in our blocklist */
1869        if( tr_sessionIsAddressBlocked( t->manager->handle, &atom->addr ) )
1870            continue;
1871
1872        ret[retCount++] = atom;
1873    }
1874
1875    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1876    *setmeSize = retCount;
1877    return ret;
1878}
1879
1880static int
1881reconnectPulse( void * vtorrent )
1882{
1883    Torrent * t = vtorrent;
1884    static time_t prevTime = 0;
1885    static int newConnectionsThisSecond = 0;
1886    time_t now;
1887
1888    torrentLock( t );
1889
1890    now = time( NULL );
1891    if( prevTime != now )
1892    {
1893        prevTime = now;
1894        newConnectionsThisSecond = 0;
1895    }
1896
1897    if( !t->isRunning )
1898    {
1899        removeAllPeers( t );
1900    }
1901    else
1902    {
1903        int i, nCandidates, nBad;
1904        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1905        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1906
1907        if( nBad || nCandidates )
1908            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1909                       "%d connection candidates, %d atoms, max per pulse is %d",
1910                       t->tor->info.name, nBad, nCandidates,
1911                       tr_ptrArraySize(t->pool),
1912                       (int)MAX_RECONNECTIONS_PER_PULSE );
1913
1914        /* disconnect some peers.
1915           if we transferred piece data, then they might be good peers,
1916           so reset their `numFails' weight to zero.  otherwise we connected
1917           to them fruitlessly, so mark it as another fail */
1918        for( i=0; i<nBad; ++i ) {
1919            tr_peer * peer = connections[i];
1920            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1921            if( peer->pieceDataActivityDate )
1922                atom->numFails = 0;
1923            else
1924                ++atom->numFails;
1925            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
1926            removePeer( t, peer );
1927        }
1928
1929        /* add some new ones */
1930        for( i=0;    i < nCandidates
1931                  && i < MAX_RECONNECTIONS_PER_PULSE
1932                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1933        {
1934            tr_peerMgr * mgr = t->manager;
1935            struct peer_atom * atom = candidates[i];
1936            tr_peerIo * io;
1937
1938            tordbg( t, "Starting an OUTGOING connection with %s",
1939                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1940
1941            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1942            if( io == NULL )
1943            {
1944                atom->myflags |= MYFLAG_UNREACHABLE;
1945            }
1946            else
1947            {
1948                tr_handshake * handshake = tr_handshakeNew( io,
1949                                                            mgr->handle->encryptionMode,
1950                                                            myHandshakeDoneCB,
1951                                                            mgr );
1952
1953                assert( tr_peerIoGetTorrentHash( io ) );
1954
1955                ++newConnectionsThisSecond;
1956
1957                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1958            }
1959
1960            atom->time = time( NULL );
1961        }
1962
1963        /* cleanup */
1964        tr_free( connections );
1965        tr_free( candidates );
1966    }
1967
1968    torrentUnlock( t );
1969    return TRUE;
1970}
Note: See TracBrowser for help on using the repository browser.