source: trunk/libtransmission/peer-mgr.c @ 6616

Last change on this file since 6616 was 6616, checked in by charles, 13 years ago

(libT) remove some dead functions: tr_calloc(), tr_compareUint16(), tr_compareUint32()

  • Property svn:keywords set to Date Rev Author Id
File size: 55.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6616 2008-08-21 19:03:56Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = (60 * 3),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = (60 * 10),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = (2 * 1000),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 2,
60
61    /* max number of peers to ask for per second overall.
62     * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 4,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_ptrArray * webseeds; /* tr_webseed */
107    tr_timer * reconnectTimer;
108    tr_timer * rechokeTimer;
109    tr_timer * refillTimer;
110    tr_torrent * tor;
111    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
112    tr_bitfield * requestedPieces;
113
114    unsigned int isRunning : 1;
115
116    struct tr_peerMgr * manager;
117}
118Torrent;
119
120struct tr_peerMgr
121{
122    tr_handle * handle;
123    tr_ptrArray * torrents; /* Torrent */
124    tr_ptrArray * incomingHandshakes; /* tr_handshake */
125};
126
127#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
128
129/**
130***
131**/
132
133/* The following is not a very high quality random number generator.  It
134 * is mainly used as a simple stupid random number generator inside some
135 * functions below. Please don't use it for anything else unless you
136 * know what you're doing. Use tr_cryptoRandInt() instead.
137 */
138
139static int
140tr_stupidRandInt( int sup )
141{
142    static int init = 0;
143    assert( sup > 0 );
144
145    if ( !init )
146    {
147        srand( tr_date() );
148        init = 1;
149    }
150
151    return rand() % sup;
152}
153
154static void
155managerLock( const struct tr_peerMgr * manager )
156{
157    tr_globalLock( manager->handle );
158}
159static void
160managerUnlock( const struct tr_peerMgr * manager )
161{
162    tr_globalUnlock( manager->handle );
163}
164static void
165torrentLock( Torrent * torrent )
166{
167    managerLock( torrent->manager );
168}
169static void
170torrentUnlock( Torrent * torrent )
171{
172    managerUnlock( torrent->manager );
173}
174static int
175torrentIsLocked( const Torrent * t )
176{
177    return tr_globalIsLocked( t->manager->handle );
178}
179
180/**
181***
182**/
183
184static int
185compareAddresses( const struct in_addr * a, const struct in_addr * b )
186{
187    return a->s_addr - b->s_addr;
188}
189
190static int
191handshakeCompareToAddr( const void * va, const void * vb )
192{
193    const tr_handshake * a = va;
194    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
195}
196
197static int
198handshakeCompare( const void * a, const void * b )
199{
200    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
201}
202
203static tr_handshake*
204getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
205{
206    return tr_ptrArrayFindSorted( handshakes,
207                                  in_addr,
208                                  handshakeCompareToAddr );
209}
210
211static int
212comparePeerAtomToAddress( const void * va, const void * vb )
213{
214    const struct peer_atom * a = va;
215    return compareAddresses( &a->addr, vb );
216}
217
218static int
219comparePeerAtoms( const void * va, const void * vb )
220{
221    const struct peer_atom * b = vb;
222    return comparePeerAtomToAddress( va, &b->addr );
223}
224
225/**
226***
227**/
228
229static int
230torrentCompare( const void * va, const void * vb )
231{
232    const Torrent * a = va;
233    const Torrent * b = vb;
234    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
235}
236
237static int
238torrentCompareToHash( const void * va, const void * vb )
239{
240    const Torrent * a = va;
241    const uint8_t * b_hash = vb;
242    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
243}
244
245static Torrent*
246getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
247{
248    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
249                                             hash,
250                                             torrentCompareToHash );
251}
252
253static int
254peerCompare( const void * va, const void * vb )
255{
256    const tr_peer * a = va;
257    const tr_peer * b = vb;
258    return compareAddresses( &a->in_addr, &b->in_addr );
259}
260
261static int
262peerCompareToAddr( const void * va, const void * vb )
263{
264    const tr_peer * a = va;
265    return compareAddresses( &a->in_addr, vb );
266}
267
268static tr_peer*
269getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
270{
271    assert( torrentIsLocked( torrent ) );
272    assert( in_addr );
273
274    return tr_ptrArrayFindSorted( torrent->peers,
275                                  in_addr,
276                                  peerCompareToAddr );
277}
278
279static struct peer_atom*
280getExistingAtom( const Torrent * t, const struct in_addr * addr )
281{
282    assert( torrentIsLocked( t ) );
283    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
284}
285
286static int
287peerIsInUse( const Torrent * ct, const struct in_addr * addr )
288{
289    Torrent * t = (Torrent*) ct;
290
291    assert( torrentIsLocked ( t ) );
292
293    return getExistingPeer( t, addr )
294        || getExistingHandshake( t->outgoingHandshakes, addr )
295        || getExistingHandshake( t->manager->incomingHandshakes, addr );
296}
297
298static tr_peer*
299peerConstructor( const struct in_addr * in_addr )
300{
301    tr_peer * p;
302    p = tr_new0( tr_peer, 1 );
303    p->rcToClient = tr_rcInit( );
304    p->rcToPeer = tr_rcInit( );
305    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
306    return p;
307}
308
309static tr_peer*
310getPeer( Torrent * torrent, const struct in_addr * in_addr )
311{
312    tr_peer * peer;
313
314    assert( torrentIsLocked( torrent ) );
315
316    peer = getExistingPeer( torrent, in_addr );
317
318    if( peer == NULL ) {
319        peer = peerConstructor( in_addr );
320        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
321    }
322
323    return peer;
324}
325
326static void
327peerDestructor( tr_peer * peer )
328{
329    assert( peer );
330    assert( peer->msgs );
331
332    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
333    tr_peerMsgsFree( peer->msgs );
334
335    tr_peerIoFree( peer->io );
336
337    tr_bitfieldFree( peer->have );
338    tr_bitfieldFree( peer->blame );
339    tr_rcClose( peer->rcToClient );
340    tr_rcClose( peer->rcToPeer );
341    tr_free( peer->client );
342    tr_free( peer );
343}
344
345static void
346removePeer( Torrent * t, tr_peer * peer )
347{
348    tr_peer * removed;
349    struct peer_atom * atom;
350
351    assert( torrentIsLocked( t ) );
352
353    atom = getExistingAtom( t, &peer->in_addr );
354    assert( atom );
355    atom->time = time( NULL );
356
357    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
358    assert( removed == peer );
359    peerDestructor( removed );
360}
361
362static void
363removeAllPeers( Torrent * t )
364{
365    while( !tr_ptrArrayEmpty( t->peers ) )
366        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
367}
368
369static void
370torrentDestructor( Torrent * t )
371{
372    uint8_t hash[SHA_DIGEST_LENGTH];
373
374    assert( t );
375    assert( !t->isRunning );
376    assert( t->peers );
377    assert( torrentIsLocked( t ) );
378    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
379    assert( tr_ptrArrayEmpty( t->peers ) );
380
381    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
382
383    tr_timerFree( &t->reconnectTimer );
384    tr_timerFree( &t->rechokeTimer );
385    tr_timerFree( &t->refillTimer );
386
387    tr_bitfieldFree( t->requestedPieces );
388    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
389    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
390    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
391    tr_ptrArrayFree( t->peers, NULL );
392
393    tr_free( t );
394}
395
396static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
397
398static Torrent*
399torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
400{
401    int i;
402    Torrent * t;
403
404    t = tr_new0( Torrent, 1 );
405    t->manager = manager;
406    t->tor = tor;
407    t->pool = tr_ptrArrayNew( );
408    t->peers = tr_ptrArrayNew( );
409    t->webseeds = tr_ptrArrayNew( );
410    t->outgoingHandshakes = tr_ptrArrayNew( );
411    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
412    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
413
414    for( i=0; i<tor->info.webseedCount; ++i ) {
415        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
416        tr_ptrArrayAppend( t->webseeds, w );
417    }
418
419    return t;
420}
421
422/**
423 * For explanation, see http://www.bittorrent.org/fast_extensions.html
424 * Also see the "test-allowed-set" unit test
425 */
426struct tr_bitfield *
427tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
428                              const uint32_t         sz,        /* number of pieces in torrent */
429                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
430                              const struct in_addr * ip )       /* peer's address */
431{
432    uint8_t w[SHA_DIGEST_LENGTH + 4];
433    uint8_t x[SHA_DIGEST_LENGTH];
434    tr_bitfield_t * a;
435    uint32_t a_size;
436
437    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
438    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
439    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
440
441    a = tr_bitfieldNew( sz );
442    a_size = 0;
443   
444    while( a_size < k )
445    {
446        int i;
447        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
448        {
449            uint32_t j = i * 4;                                /* (5) */
450            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
451            uint32_t index = y % sz;                           /* (7) */
452            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
453                tr_bitfieldAdd( a, index );                    /* (9) */
454                ++a_size;
455            }
456        }
457        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
458    }
459   
460    return a;
461}
462
463tr_peerMgr*
464tr_peerMgrNew( tr_handle * handle )
465{
466    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
467    m->handle = handle;
468    m->torrents = tr_ptrArrayNew( );
469    m->incomingHandshakes = tr_ptrArrayNew( );
470    return m;
471}
472
473void
474tr_peerMgrFree( tr_peerMgr * manager )
475{
476    managerLock( manager );
477
478    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
479     * the item from manager->handshakes, so this is a little roundabout... */
480    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
481        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
482    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
483
484    /* free the torrents. */
485    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
486
487    managerUnlock( manager );
488    tr_free( manager );
489}
490
491static tr_peer**
492getConnectedPeers( Torrent * t, int * setmeCount )
493{
494    int i, peerCount, connectionCount;
495    tr_peer **peers;
496    tr_peer **ret;
497
498    assert( torrentIsLocked( t ) );
499
500    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
501    ret = tr_new( tr_peer*, peerCount );
502
503    for( i=connectionCount=0; i<peerCount; ++i )
504        if( peers[i]->msgs )
505            ret[connectionCount++] = peers[i];
506
507    *setmeCount = connectionCount;
508    return ret;
509}
510
511static int
512clientIsDownloadingFrom( const tr_peer * peer )
513{
514    return peer->clientIsInterested && !peer->clientIsChoked;
515}
516
517static int
518clientIsUploadingTo( const tr_peer * peer )
519{
520    return peer->peerIsInterested && !peer->peerIsChoked;
521}
522
523/***
524****
525***/
526
527int
528tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
529                      const uint8_t          * torrentHash,
530                      const struct in_addr   * addr )
531{
532    int isSeed = FALSE;
533    const Torrent * t = NULL;
534    const struct peer_atom * atom = NULL;
535
536    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
537    if( t )
538        atom = getExistingAtom( t, addr );
539    if( atom )
540        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
541
542    return isSeed;
543}
544
545/***
546****  Refill
547***/
548
549struct tr_refill_piece
550{
551    tr_priority_t priority;
552    int missingBlockCount;
553    int random;
554    uint32_t piece;
555    uint32_t peerCount;
556};
557
558static int
559compareRefillPiece (const void * aIn, const void * bIn)
560{
561    const struct tr_refill_piece * a = aIn;
562    const struct tr_refill_piece * b = bIn;
563
564    /* if one piece has a higher priority, it goes first */
565    if( a->priority != b->priority )
566        return a->priority > b->priority ? -1 : 1;
567   
568    /* otherwise if one has fewer peers, it goes first */
569    if (a->peerCount != b->peerCount)
570        return a->peerCount < b->peerCount ? -1 : 1;
571
572    /* otherwise go with our random seed */
573    return a->random - b->random;
574}
575
576static int
577isPieceInteresting( const tr_torrent  * tor,
578                    tr_piece_index_t    piece )
579{
580    if( tor->info.pieces[piece].dnd ) /* we don't want it */
581        return 0;
582
583    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
584        return 0;
585
586    return 1;
587}
588
589static uint32_t*
590getPreferredPieces( Torrent     * t,
591                    uint32_t    * pieceCount )
592{
593    const tr_torrent * tor = t->tor;
594    const tr_info * inf = &tor->info;
595    tr_piece_index_t i;
596    uint32_t poolSize = 0;
597    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
598    int peerCount;
599    tr_peer** peers;
600
601    assert( torrentIsLocked( t ) );
602
603    peers = getConnectedPeers( t, &peerCount );
604
605    for( i=0; i<inf->pieceCount; ++i )
606        if( isPieceInteresting( tor, i ) )
607            pool[poolSize++] = i;
608
609    /* sort the pool from most interesting to least... */
610    if( poolSize > 1 )
611    {
612        uint32_t j;
613        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
614
615        for( j=0; j<poolSize; ++j )
616        {
617            int k;
618            const tr_piece_index_t piece = pool[j];
619            struct tr_refill_piece * setme = p + j;
620
621            setme->piece = piece;
622            setme->priority = inf->pieces[piece].priority;
623            setme->peerCount = 0;
624            setme->random = tr_stupidRandInt( INT_MAX );
625            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
626
627            for( k=0; k<peerCount; ++k ) {
628                const tr_peer * peer = peers[k];
629                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
630                    ++setme->peerCount;
631            }
632        }
633
634        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
635
636        for( j=0; j<poolSize; ++j )
637            pool[j] = p[j].piece;
638
639        tr_free( p );
640    }
641
642    tr_free( peers );
643
644    *pieceCount = poolSize;
645    return pool;
646}
647
648static tr_peer**
649getPeersUploadingToClient( Torrent * t, int * setmeCount )
650{
651    int i;
652    int peerCount = 0;
653    int retCount = 0;
654    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
655    tr_peer ** ret = tr_new( tr_peer*, peerCount );
656
657    /* get a list of peers we're downloading from */
658    for( i=0; i<peerCount; ++i )
659        if( clientIsDownloadingFrom( peers[i] ) )
660            ret[retCount++] = peers[i];
661
662    /* pick a different starting point each time so all peers
663     * get a chance at the first blocks in the queue */
664    if( retCount ) {
665        tr_peer ** tmp = tr_new( tr_peer*, retCount );
666        i = tr_stupidRandInt( retCount );
667        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
668        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
669        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
670        tr_free( tmp );
671    }
672
673    *setmeCount = retCount;
674    return ret;
675}
676
677static int
678refillPulse( void * vtorrent )
679{
680    Torrent * t = vtorrent;
681    tr_torrent * tor = t->tor;
682    int peerCount;
683    int webseedCount;
684    tr_peer ** peers;
685    tr_webseed ** webseeds;
686    uint32_t pieceCount;
687    uint32_t * pieces;
688    tr_piece_index_t i;
689
690    if( !t->isRunning )
691        return TRUE;
692    if( tr_torrentIsSeed( t->tor ) )
693        return TRUE;
694
695    torrentLock( t );
696    tordbg( t, "Refilling Request Buffers..." );
697
698    pieces = getPreferredPieces( t, &pieceCount );
699    peers = getPeersUploadingToClient( t, &peerCount );
700    webseedCount = tr_ptrArraySize( t->webseeds );
701    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
702
703    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
704    {
705        int j;
706        int handled = FALSE;
707        const tr_piece_index_t piece = pieces[i];
708
709        assert( piece < tor->info.pieceCount );
710
711        /* find a peer who can ask for this piece */
712        for( j=0; !handled && j<peerCount; )
713        {
714            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
715            switch( val )
716            {
717                case TR_ADDREQ_FULL: 
718                case TR_ADDREQ_CLIENT_CHOKED:
719                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
720                    break;
721                case TR_ADDREQ_MISSING: 
722                case TR_ADDREQ_DUPLICATE: 
723                    ++j;
724                    break;
725                case TR_ADDREQ_OK:
726                    tr_bitfieldAdd( t->requestedPieces, piece );
727                    handled = TRUE;
728                    break;
729                default:
730                    assert( 0 && "unhandled value" );
731                    break;
732            }
733        }
734
735        /* maybe one of the webseeds can do it */
736        for( j=0; !handled && j<webseedCount; )
737        {
738            const int val = tr_webseedAddRequest( webseeds[j], piece );
739            switch( val )
740            {
741                case TR_ADDREQ_FULL: 
742                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
743                    break;
744                case TR_ADDREQ_OK:
745                    tr_bitfieldAdd( t->requestedPieces, piece );
746                    handled = TRUE;
747                    break;
748                default:
749                    assert( 0 && "unhandled value" );
750                    break;
751            }
752        }
753    }
754
755    /* cleanup */
756    tr_free( webseeds );
757    tr_free( peers );
758    tr_free( pieces );
759
760    t->refillTimer = NULL;
761    torrentUnlock( t );
762    return FALSE;
763}
764
765static void
766addStrike( Torrent * t, tr_peer * peer )
767{
768    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
769
770    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
771    {
772        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
773        atom->myflags |= MYFLAG_BANNED;
774        peer->doPurge = 1;
775        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
776    }
777}
778
779static void
780gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
781{
782    tr_torrent * tor = t->tor;
783    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
784    tor->corruptCur += byteCount;
785    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
786}
787
788static void
789refillSoon( Torrent * t )
790{
791    if( t->refillTimer == NULL )
792        t->refillTimer = tr_timerNew( t->manager->handle,
793                                      refillPulse, t,
794                                      REFILL_PERIOD_MSEC );
795}
796
797static void
798peerCallbackFunc( void * vpeer, void * vevent, void * vt )
799{
800    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
801    Torrent * t = (Torrent *) vt;
802    const tr_peer_event * e = vevent;
803
804    torrentLock( t );
805
806    switch( e->eventType )
807    {
808        case TR_PEER_NEED_REQ:
809            refillSoon( t );
810            break;
811
812        case TR_PEER_CANCEL:
813            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
814            break;
815
816        case TR_PEER_PEER_GOT_DATA: {
817            const time_t now = time( NULL );
818            tr_torrent * tor = t->tor;
819            tor->activityDate = now;
820            tor->uploadedCur += e->length;
821            tr_rcTransferred( tor->upload, e->length );
822            tr_rcTransferred( tor->handle->upload, e->length );
823            tr_statsAddUploaded( tor->handle, e->length );
824            if( peer ) {
825                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
826                atom->piece_data_time = time( NULL );
827            }
828            break;
829        }
830
831        case TR_PEER_CLIENT_GOT_DATA: {
832            const time_t now = time( NULL );
833            tr_torrent * tor = t->tor;
834            tor->activityDate = now;
835            /* only add this to downloadedCur if we got it from a peer --
836             * webseeds shouldn't count against our ratio.  As one tracker
837             * admin put it, "Those pieces are downloaded directly from the
838             * content distributor, not the peers, it is the tracker's job
839             * to manage the swarms, not the web server and does not fit
840             * into the jurisdiction of the tracker." */
841            if( peer )
842                tor->downloadedCur += e->length;
843            tr_rcTransferred( tor->download, e->length );
844            tr_rcTransferred( tor->handle->download, e->length );
845            tr_statsAddDownloaded( tor->handle, e->length );
846            if( peer ) {
847                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
848                atom->piece_data_time = time( NULL );
849            }
850            break;
851        }
852
853        case TR_PEER_PEER_PROGRESS: {
854            if( peer ) {
855                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
856                const int peerIsSeed = e->progress >= 1.0;
857                if( peerIsSeed ) {
858                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
859                    atom->flags |= ADDED_F_SEED_FLAG;
860                } else {
861                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
862                    atom->flags &= ~ADDED_F_SEED_FLAG;
863                }
864            }
865            break;
866        }
867
868        case TR_PEER_CLIENT_GOT_BLOCK:
869        {
870            tr_torrent * tor = t->tor;
871
872            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
873
874            tr_cpBlockAdd( tor->completion, block );
875
876            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
877            {
878                const tr_piece_index_t p = e->pieceIndex;
879                const tr_errno err = tr_ioTestPiece( tor, p );
880
881                if( err ) {
882                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
883                               (unsigned long)p, tr_errorString( err ) );
884                }
885
886                tr_torrentSetHasPiece( tor, p, !err );
887                tr_torrentSetPieceChecked( tor, p, TRUE );
888                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
889
890                if( err )
891                    gotBadPiece( t, p );
892                else {
893                    int i, peerCount;
894                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
895                    for( i=0; i<peerCount; ++i )
896                        tr_peerMsgsHave( peers[i]->msgs, p );
897                    tr_free( peers );
898                }
899
900                tr_torrentRecheckCompleteness( tor );
901            }
902            break;
903        }
904
905        case TR_PEER_ERROR:
906            if( TR_ERROR_IS_IO( e->err ) ) {
907                t->tor->error = e->err;
908                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
909                tr_torrentStop( t->tor );
910            } else if( e->err == TR_ERROR_ASSERT ) {
911                addStrike( t, peer );
912            }
913            peer->doPurge = 1;
914            break;
915
916        default:
917            assert(0);
918    }
919
920    torrentUnlock( t );
921}
922
923static void
924ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
925{
926    if( getExistingAtom( t, addr ) == NULL )
927    {
928        struct peer_atom * a;
929        a = tr_new0( struct peer_atom, 1 );
930        a->addr = *addr;
931        a->port = port;
932        a->flags = flags;
933        a->from = from;
934        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
935        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
936    }
937}
938
939static int
940getMaxPeerCount( const tr_torrent * tor UNUSED )
941{
942    return tor->maxConnectedPeers;
943}
944
945/* FIXME: this is kind of a mess. */
946static void
947myHandshakeDoneCB( tr_handshake    * handshake,
948                   tr_peerIo       * io,
949                   int               isConnected,
950                   const uint8_t   * peer_id,
951                   void            * vmanager )
952{
953    int ok = isConnected;
954    uint16_t port;
955    const struct in_addr * addr;
956    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
957    Torrent * t;
958    tr_handshake * ours;
959
960    assert( io );
961    assert( isConnected==0 || isConnected==1 );
962
963    t = tr_peerIoHasTorrentHash( io )
964        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
965        : NULL;
966
967    if( tr_peerIoIsIncoming ( io ) )
968        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
969                                        handshake, handshakeCompare );
970    else if( t )
971        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
972                                        handshake, handshakeCompare );
973    else
974        ours = handshake;
975
976    assert( ours );
977    assert( ours == handshake );
978
979    if( t )
980        torrentLock( t );
981
982    addr = tr_peerIoGetAddress( io, &port );
983
984    if( !ok || !t || !t->isRunning )
985    {
986        if( t ) {
987            struct peer_atom * atom = getExistingAtom( t, addr );
988            if( atom )
989                ++atom->numFails;
990        }
991
992        tr_peerIoFree( io );
993    }
994    else /* looking good */
995    {
996        struct peer_atom * atom;
997        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
998        atom = getExistingAtom( t, addr );
999
1000        if( atom->myflags & MYFLAG_BANNED )
1001        {
1002            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
1003            tr_peerIoFree( io );
1004        }
1005        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
1006        {
1007            tr_peerIoFree( io );
1008        }
1009        else
1010        {
1011            tr_peer * peer = getExistingPeer( t, addr );
1012
1013            if( peer ) /* we already have this peer */
1014            {
1015                tr_peerIoFree( io );
1016            }
1017            else
1018            {
1019                peer = getPeer( t, addr );
1020                tr_free( peer->client );
1021
1022                if( !peer_id )
1023                    peer->client = NULL;
1024                else {
1025                    char client[128];
1026                    tr_clientForId( client, sizeof( client ), peer_id );
1027                    peer->client = tr_strdup( client );
1028                }
1029                peer->port = port;
1030                peer->io = io;
1031                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1032                atom->time = time( NULL );
1033            }
1034        }
1035    }
1036
1037    if( t )
1038        torrentUnlock( t );
1039}
1040
1041void
1042tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1043                       struct in_addr  * addr,
1044                       uint16_t          port,
1045                       int               socket )
1046{
1047    managerLock( manager );
1048
1049    if( tr_sessionIsAddressBlocked( manager->handle, addr ) )
1050    {
1051        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1052                inet_ntoa( *addr ) );
1053        tr_netClose( socket );
1054    }
1055    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1056    {
1057        tr_netClose( socket );
1058    }
1059    else /* we don't have a connetion to them yet... */
1060    {
1061        tr_peerIo * io;
1062        tr_handshake * handshake;
1063
1064        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1065
1066        handshake = tr_handshakeNew( io,
1067                                     manager->handle->encryptionMode,
1068                                     myHandshakeDoneCB,
1069                                     manager );
1070
1071        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1072    }
1073
1074    managerUnlock( manager );
1075}
1076
1077void
1078tr_peerMgrAddPex( tr_peerMgr     * manager,
1079                  const uint8_t  * torrentHash,
1080                  uint8_t          from,
1081                  const tr_pex   * pex )
1082{
1083    Torrent * t;
1084    managerLock( manager );
1085
1086    t = getExistingTorrent( manager, torrentHash );
1087    if( !tr_sessionIsAddressBlocked( t->manager->handle, &pex->in_addr ) )
1088        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1089
1090    managerUnlock( manager );
1091}
1092
1093tr_pex *
1094tr_peerMgrCompactToPex( const void     * compact,
1095                        size_t           compactLen,
1096                        const uint8_t  * added_f,
1097                        size_t           added_f_len,
1098                        size_t         * pexCount )
1099{
1100    size_t i;
1101    size_t n = compactLen / 6;
1102    const uint8_t * walk = compact;
1103    tr_pex * pex = tr_new0( tr_pex, n );
1104
1105    for( i=0; i<n; ++i ) {
1106        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1107        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1108        if( added_f && ( n == added_f_len ) )
1109            pex[i].flags = added_f[i];
1110    }
1111
1112    *pexCount = n;
1113    return pex;
1114}
1115
1116/**
1117***
1118**/
1119
1120void
1121tr_peerMgrSetBlame( tr_peerMgr     * manager,
1122                    const uint8_t  * torrentHash,
1123                    tr_piece_index_t pieceIndex,
1124                    int              success )
1125{
1126    if( !success )
1127    {
1128        int peerCount, i;
1129        Torrent * t = getExistingTorrent( manager, torrentHash );
1130        tr_peer ** peers;
1131
1132        assert( torrentIsLocked( t ) );
1133
1134        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1135        for( i=0; i<peerCount; ++i )
1136        {
1137            tr_peer * peer = peers[i];
1138            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1139            {
1140                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1141                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1142                           pieceIndex, (int)peer->strikes+1 );
1143                addStrike( t, peer );
1144            }
1145        }
1146    }
1147}
1148
1149int
1150tr_pexCompare( const void * va, const void * vb )
1151{
1152    const tr_pex * a = va;
1153    const tr_pex * b = vb;
1154    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1155    if( i ) return i;
1156    if( a->port < b->port ) return -1;
1157    if( a->port > b->port ) return 1;
1158    return 0;
1159}
1160
1161int tr_pexCompare( const void * a, const void * b );
1162
1163static int
1164peerPrefersCrypto( const tr_peer * peer )
1165{
1166    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1167        return TRUE;
1168
1169    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1170        return FALSE;
1171
1172    return tr_peerIoIsEncrypted( peer->io );
1173};
1174
1175int
1176tr_peerMgrGetPeers( tr_peerMgr      * manager,
1177                    const uint8_t   * torrentHash,
1178                    tr_pex         ** setme_pex )
1179{
1180    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1181    int i, peerCount;
1182    const tr_peer ** peers;
1183    tr_pex * pex;
1184    tr_pex * walk;
1185
1186    managerLock( manager );
1187
1188    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1189    pex = walk = tr_new( tr_pex, peerCount );
1190
1191    for( i=0; i<peerCount; ++i, ++walk )
1192    {
1193        const tr_peer * peer = peers[i];
1194
1195        walk->in_addr = peer->in_addr;
1196
1197        walk->port = peer->port;
1198
1199        walk->flags = 0;
1200        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1201        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1202    }
1203
1204    assert( ( walk - pex ) == peerCount );
1205    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1206    *setme_pex = pex;
1207
1208    managerUnlock( manager );
1209
1210    return peerCount;
1211}
1212
1213static int reconnectPulse( void * vtorrent );
1214static int rechokePulse( void * vtorrent );
1215
1216void
1217tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1218                        const uint8_t  * torrentHash )
1219{
1220    Torrent * t;
1221
1222    managerLock( manager );
1223
1224    t = getExistingTorrent( manager, torrentHash );
1225
1226    assert( t );
1227    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1228    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1229
1230    if( !t->isRunning )
1231    {
1232        t->isRunning = 1;
1233
1234        t->reconnectTimer = tr_timerNew( t->manager->handle,
1235                                         reconnectPulse, t,
1236                                         RECONNECT_PERIOD_MSEC );
1237
1238        t->rechokeTimer = tr_timerNew( t->manager->handle,
1239                                       rechokePulse, t,
1240                                       RECHOKE_PERIOD_MSEC );
1241
1242        reconnectPulse( t );
1243
1244        rechokePulse( t );
1245
1246        if( !tr_ptrArrayEmpty( t->webseeds ) )
1247            refillSoon( t );
1248    }
1249
1250    managerUnlock( manager );
1251}
1252
1253static void
1254stopTorrent( Torrent * t )
1255{
1256    assert( torrentIsLocked( t ) );
1257
1258    t->isRunning = 0;
1259    tr_timerFree( &t->rechokeTimer );
1260    tr_timerFree( &t->reconnectTimer );
1261
1262    /* disconnect the peers. */
1263    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1264    tr_ptrArrayClear( t->peers );
1265
1266    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1267     * which removes the handshake from t->outgoingHandshakes... */
1268    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1269        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1270}
1271void
1272tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1273                       const uint8_t  * torrentHash)
1274{
1275    managerLock( manager );
1276
1277    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1278
1279    managerUnlock( manager );
1280}
1281
1282void
1283tr_peerMgrAddTorrent( tr_peerMgr * manager,
1284                      tr_torrent * tor )
1285{
1286    Torrent * t;
1287
1288    managerLock( manager );
1289
1290    assert( tor );
1291    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1292
1293    t = torrentConstructor( manager, tor );
1294    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1295
1296    managerUnlock( manager );
1297}
1298
1299void
1300tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1301                         const uint8_t  * torrentHash )
1302{
1303    Torrent * t;
1304
1305    managerLock( manager );
1306
1307    t = getExistingTorrent( manager, torrentHash );
1308    assert( t );
1309    stopTorrent( t );
1310    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1311    torrentDestructor( t );
1312
1313    managerUnlock( manager );
1314}
1315
1316void
1317tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1318                               const uint8_t    * torrentHash,
1319                               int8_t           * tab,
1320                               unsigned int       tabCount )
1321{
1322    tr_piece_index_t i;
1323    const Torrent * t;
1324    const tr_torrent * tor;
1325    float interval;
1326    int isComplete;
1327    int peerCount;
1328    const tr_peer ** peers;
1329
1330    managerLock( manager );
1331
1332    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1333    tor = t->tor;
1334    interval = tor->info.pieceCount / (float)tabCount;
1335    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1336    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1337
1338    memset( tab, 0, tabCount );
1339
1340    for( i=0; tor && i<tabCount; ++i )
1341    {
1342        const int piece = i * interval;
1343
1344        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1345            tab[i] = -1;
1346        else if( peerCount ) {
1347            int j;
1348            for( j=0; j<peerCount; ++j )
1349                if( tr_bitfieldHas( peers[j]->have, i ) )
1350                    ++tab[i];
1351        }
1352    }
1353
1354    managerUnlock( manager );
1355}
1356
1357/* Returns the pieces that are available from peers */
1358tr_bitfield*
1359tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1360                        const uint8_t    * torrentHash )
1361{
1362    int i, size;
1363    Torrent * t;
1364    tr_peer ** peers;
1365    tr_bitfield * pieces;
1366    managerLock( manager );
1367
1368    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1369    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1370    peers = getConnectedPeers( t, &size );
1371    for( i=0; i<size; ++i )
1372        tr_bitfieldOr( pieces, peers[i]->have );
1373
1374    managerUnlock( manager );
1375    tr_free( peers );
1376    return pieces;
1377}
1378
1379int
1380tr_peerMgrHasConnections( const tr_peerMgr * manager,
1381                          const uint8_t    * torrentHash )
1382{
1383    int ret;
1384    const Torrent * t;
1385    managerLock( manager );
1386
1387    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1388    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1389
1390    managerUnlock( manager );
1391    return ret;
1392}
1393
1394void
1395tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1396                        const uint8_t    * torrentHash,
1397                        int              * setmePeersKnown,
1398                        int              * setmePeersConnected,
1399                        int              * setmeSeedsConnected,
1400                        int              * setmeWebseedsSendingToUs,
1401                        int              * setmePeersSendingToUs,
1402                        int              * setmePeersGettingFromUs,
1403                        int              * setmePeersFrom )
1404{
1405    int i, size;
1406    const Torrent * t;
1407    const tr_peer ** peers;
1408    const tr_webseed ** webseeds;
1409
1410    managerLock( manager );
1411
1412    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1413    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1414
1415    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1416    *setmePeersConnected       = 0;
1417    *setmeSeedsConnected       = 0;
1418    *setmePeersGettingFromUs   = 0;
1419    *setmePeersSendingToUs     = 0;
1420    *setmeWebseedsSendingToUs  = 0;
1421
1422    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1423        setmePeersFrom[i] = 0;
1424
1425    for( i=0; i<size; ++i )
1426    {
1427        const tr_peer * peer = peers[i];
1428        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1429
1430        if( peer->io == NULL ) /* not connected */
1431            continue;
1432
1433        ++*setmePeersConnected;
1434
1435        ++setmePeersFrom[atom->from];
1436
1437        if( clientIsDownloadingFrom( peer ) )
1438            ++*setmePeersSendingToUs;
1439
1440        if( clientIsUploadingTo( peer ) )
1441            ++*setmePeersGettingFromUs;
1442
1443        if( atom->flags & ADDED_F_SEED_FLAG )
1444            ++*setmeSeedsConnected;
1445    }
1446
1447    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1448    for( i=0; i<size; ++i )
1449    {
1450        if( tr_webseedIsActive( webseeds[i] ) )
1451            ++*setmeWebseedsSendingToUs;
1452    }
1453
1454    managerUnlock( manager );
1455}
1456
1457float*
1458tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1459                     const uint8_t    * torrentHash )
1460{
1461    const Torrent * t;
1462    const tr_webseed ** webseeds;
1463    int i;
1464    int webseedCount;
1465    float * ret;
1466
1467    assert( manager );
1468    managerLock( manager );
1469
1470    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1471    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds, &webseedCount );
1472    assert( webseedCount == t->tor->info.webseedCount );
1473    ret = tr_new0( float, webseedCount );
1474
1475    for( i=0; i<webseedCount; ++i )
1476        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1477            ret[i] = -1.0;
1478
1479    managerUnlock( manager );
1480    return ret;
1481}
1482
1483struct tr_peer_stat *
1484tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1485                     const uint8_t     * torrentHash,
1486                     int               * setmeCount UNUSED )
1487{
1488    int i, size;
1489    const Torrent * t;
1490    tr_peer ** peers;
1491    tr_peer_stat * ret;
1492
1493    assert( manager );
1494    managerLock( manager );
1495
1496    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1497    peers = getConnectedPeers( (Torrent*)t, &size );
1498    ret = tr_new0( tr_peer_stat, size );
1499
1500    for( i=0; i<size; ++i )
1501    {
1502        char * pch;
1503        const tr_peer * peer = peers[i];
1504        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1505        tr_peer_stat * stat = ret + i;
1506
1507        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1508        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1509        stat->port               = peer->port;
1510        stat->from               = atom->from;
1511        stat->progress           = peer->progress;
1512        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1513        stat->uploadToRate       = peer->rateToPeer;
1514        stat->downloadFromRate   = peer->rateToClient;
1515        stat->peerIsChoked       = peer->peerIsChoked;
1516        stat->peerIsInterested   = peer->peerIsInterested;
1517        stat->clientIsChoked     = peer->clientIsChoked;
1518        stat->clientIsInterested = peer->clientIsInterested;
1519        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1520        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1521        stat->isUploadingTo      = clientIsUploadingTo( peer );
1522
1523        pch = stat->flagStr;
1524        if( t->optimistic == peer ) *pch++ = 'O';
1525        if( stat->isDownloadingFrom ) *pch++ = 'D';
1526        else if( stat->clientIsInterested ) *pch++ = 'd';
1527        if( stat->isUploadingTo ) *pch++ = 'U';
1528        else if( stat->peerIsInterested ) *pch++ = 'u';
1529        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1530        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1531        if( stat->isEncrypted ) *pch++ = 'E';
1532        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1533        if( stat->isIncoming ) *pch++ = 'I';
1534        *pch = '\0';
1535    }
1536
1537    *setmeCount = size;
1538    tr_free( peers );
1539
1540    managerUnlock( manager );
1541    return ret;
1542}
1543
1544/**
1545***
1546**/
1547
1548struct ChokeData
1549{
1550    unsigned int  doUnchoke     : 1;
1551    unsigned int  isInterested  : 1;
1552    tr_peer * peer;
1553};
1554
1555static int
1556compareChoke( const void * va, const void * vb )
1557{
1558    const struct ChokeData * a = va;
1559    const struct ChokeData * b = vb;
1560    int diff = 0;
1561
1562    if( diff == 0 ) /* prefer higher dl speeds */
1563        diff = -tr_compareDouble( a->peer->rateToClient, b->peer->rateToClient );
1564    if( diff == 0 ) /* prefer higher ul speeds */
1565        diff = -tr_compareDouble( a->peer->rateToPeer, b->peer->rateToPeer );
1566    if( diff == 0 ) /* prefer unchoked */
1567        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1568
1569    return diff;
1570}
1571
1572static int
1573isNew( const tr_peer * peer )
1574{
1575    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1576}
1577
1578static int
1579isSame( const tr_peer * peer )
1580{
1581    return peer && peer->client && strstr( peer->client, "Transmission" );
1582}
1583
1584/**
1585***
1586**/
1587
1588static void
1589rechoke( Torrent * t )
1590{
1591    int i, peerCount, size, unchokedInterested;
1592    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1593    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1594
1595    assert( torrentIsLocked( t ) );
1596   
1597    /* sort the peers by preference and rate */
1598    for( i=0, size=0; i<peerCount; ++i )
1599    {
1600        tr_peer * peer = peers[i];
1601        if( peer->progress >= 1.0 ) /* choke all seeds */
1602            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1603        else {
1604            struct ChokeData * node = &choke[size++];
1605            node->peer = peer;
1606            node->isInterested = peer->peerIsInterested;
1607        }
1608    }
1609
1610    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1611
1612    /**
1613     * Reciprocation and number of uploads capping is managed by unchoking
1614     * the N peers which have the best upload rate and are interested.
1615     * This maximizes the client's download rate. These N peers are
1616     * referred to as downloaders, because they are interested in downloading
1617     * from the client.
1618     *
1619     * Peers which have a better upload rate (as compared to the downloaders)
1620     * but aren't interested get unchoked. If they become interested, the
1621     * downloader with the worst upload rate gets choked. If a client has
1622     * a complete file, it uses its upload rate rather than its download
1623     * rate to decide which peers to unchoke.
1624     */
1625    unchokedInterested = 0;
1626    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1627        choke[i].doUnchoke = 1;
1628        if( choke[i].isInterested )
1629            ++unchokedInterested;
1630    }
1631
1632    /* optimistic unchoke */
1633    if( i < size )
1634    {
1635        int n;
1636        struct ChokeData * c;
1637        tr_ptrArray * randPool = tr_ptrArrayNew( );
1638
1639        for( ; i<size; ++i )
1640        {
1641            if( choke[i].isInterested )
1642            {
1643                const tr_peer * peer = choke[i].peer;
1644                int x=1, y;
1645                if( isNew( peer ) ) x *= 3;
1646                if( isSame( peer ) ) x *= 3;
1647                for( y=0; y<x; ++y )
1648                    tr_ptrArrayAppend( randPool, &choke[i] );
1649            }
1650        }
1651
1652        if(( n = tr_ptrArraySize( randPool )))
1653        {
1654            c = tr_ptrArrayNth( randPool, tr_stupidRandInt( n ));
1655            c->doUnchoke = 1;
1656            t->optimistic = c->peer;
1657        }
1658
1659        tr_ptrArrayFree( randPool, NULL );
1660    }
1661
1662    for( i=0; i<size; ++i )
1663        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1664
1665    /* cleanup */
1666    tr_free( choke );
1667    tr_free( peers );
1668}
1669
1670static int
1671rechokePulse( void * vtorrent )
1672{
1673    Torrent * t = vtorrent;
1674    torrentLock( t );
1675    rechoke( t );
1676    torrentUnlock( t );
1677    return TRUE;
1678}
1679
1680/***
1681****
1682****  Life and Death
1683****
1684***/
1685
1686static int
1687shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1688{
1689    const tr_torrent * tor = t->tor;
1690    const time_t now = time( NULL );
1691    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1692
1693    /* if it's marked for purging, close it */
1694    if( peer->doPurge ) {
1695        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1696        return TRUE;
1697    }
1698
1699    /* if we're seeding and the peer has everything we have,
1700     * and enough time has passed for a pex exchange, then disconnect */
1701    if( tr_torrentIsSeed( tor ) ) {
1702        int peerHasEverything;
1703        if( atom->flags & ADDED_F_SEED_FLAG )
1704            peerHasEverything = TRUE;
1705        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1706            peerHasEverything = FALSE;
1707        else {
1708            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1709            tr_bitfieldDifference( tmp, peer->have );
1710            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1711            tr_bitfieldFree( tmp );
1712        }
1713        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1714            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1715            return TRUE;
1716        }
1717    }
1718
1719    /* disconnect if it's been too long since piece data has been transferred.
1720     * this is on a sliding scale based on number of available peers... */
1721    {
1722        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1723        /* if we have >= relaxIfFewerThan, strictness is 100%.
1724         * if we have zero connections, strictness is 0% */
1725        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1726            ? 1.0
1727            : peerCount / (float)relaxStrictnessIfFewerThanN;
1728        const int lo = MIN_UPLOAD_IDLE_SECS;
1729        const int hi = MAX_UPLOAD_IDLE_SECS;
1730        const int limit = lo + ((hi-lo) * strictness);
1731        const time_t then = peer->pieceDataActivityDate;
1732        const int idleTime = then ? (now-then) : 0;
1733        if( idleTime > limit ) {
1734            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1735                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1736            return TRUE;
1737        }
1738    }
1739
1740    return FALSE;
1741}
1742
1743static tr_peer **
1744getPeersToClose( Torrent * t, int * setmeSize )
1745{
1746    int i, peerCount, outsize;
1747    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1748    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1749
1750    assert( torrentIsLocked( t ) );
1751
1752    for( i=outsize=0; i<peerCount; ++i )
1753        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1754            ret[outsize++] = peers[i];
1755
1756    *setmeSize = outsize;
1757    return ret;
1758}
1759
1760static int
1761compareCandidates( const void * va, const void * vb )
1762{
1763    const struct peer_atom * a = * (const struct peer_atom**) va;
1764    const struct peer_atom * b = * (const struct peer_atom**) vb;
1765    int i;
1766
1767    if( a->piece_data_time > b->piece_data_time ) return -1;
1768    if( a->piece_data_time < b->piece_data_time ) return  1;
1769
1770    if(( i = a->numFails - b->numFails ))
1771        return i;
1772
1773    if( a->time != b->time )
1774        return a->time < b->time ? -1 : 1;
1775
1776    return 0;
1777}
1778
1779static int
1780getReconnectIntervalSecs( const struct peer_atom * atom )
1781{
1782    int sec;
1783
1784    switch( atom->numFails )
1785    {
1786        case 0: sec = 0; break;
1787        case 1: sec = 5; break;
1788        case 2: sec = 2*60; break;
1789        case 3: sec = 15*60; break;
1790        case 4: sec = 30*60; break;
1791        case 5: sec = 60*60; break;
1792        default: sec = 120*60; break;
1793    }
1794
1795    return sec;
1796}
1797
1798static struct peer_atom **
1799getPeerCandidates( Torrent * t, int * setmeSize )
1800{
1801    int i, atomCount, retCount;
1802    struct peer_atom ** atoms;
1803    struct peer_atom ** ret;
1804    const time_t now = time( NULL );
1805    const int seed = tr_torrentIsSeed( t->tor );
1806
1807    assert( torrentIsLocked( t ) );
1808
1809    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1810    ret = tr_new( struct peer_atom*, atomCount );
1811    for( i=retCount=0; i<atomCount; ++i )
1812    {
1813        struct peer_atom * atom = atoms[i];
1814
1815        /* peer fed us too much bad data ... we only keep it around
1816         * now to weed it out in case someone sends it to us via pex */
1817        if( atom->myflags & MYFLAG_BANNED )
1818            continue;
1819
1820        /* peer was unconnectable before, so we're not going to keep trying.
1821         * this is needs a separate flag from `banned', since if they try
1822         * to connect to us later, we'll let them in */
1823        if( atom->myflags & MYFLAG_UNREACHABLE )
1824            continue;
1825
1826        /* we don't need two connections to the same peer... */
1827        if( peerIsInUse( t, &atom->addr ) )
1828            continue;
1829
1830        /* no need to connect if we're both seeds... */
1831        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1832            continue;
1833
1834        /* If we were connected to this peer recently and transferring
1835         * piece data, try to reconnect -- network troubles may have
1836         * disconnected us.  but if we weren't sharing piece data,
1837         * hold off on this peer to give another one a try instead */
1838        if( ( now - atom->piece_data_time ) > 10 )
1839        {
1840            const int wait = getReconnectIntervalSecs( atom );
1841            if( ( now - atom->time ) < wait ) {
1842                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1843                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1844                continue;
1845            }
1846        }
1847
1848        /* Don't connect to peers in our blocklist */
1849        if( tr_sessionIsAddressBlocked( t->manager->handle, &atom->addr ) )
1850            continue;
1851
1852        ret[retCount++] = atom;
1853    }
1854
1855    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1856    *setmeSize = retCount;
1857    return ret;
1858}
1859
1860static int
1861reconnectPulse( void * vtorrent )
1862{
1863    Torrent * t = vtorrent;
1864    static time_t prevTime = 0;
1865    static int newConnectionsThisSecond = 0;
1866    time_t now;
1867
1868    torrentLock( t );
1869
1870    now = time( NULL );
1871    if( prevTime != now )
1872    {
1873        prevTime = now;
1874        newConnectionsThisSecond = 0;
1875    }
1876
1877    if( !t->isRunning )
1878    {
1879        removeAllPeers( t );
1880    }
1881    else
1882    {
1883        int i, nCandidates, nBad;
1884        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1885        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1886
1887        if( nBad || nCandidates )
1888            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1889                       "%d connection candidates, %d atoms, max per pulse is %d",
1890                       t->tor->info.name, nBad, nCandidates,
1891                       tr_ptrArraySize(t->pool),
1892                       (int)MAX_RECONNECTIONS_PER_PULSE );
1893
1894        /* disconnect some peers.
1895           if we transferred piece data, then they might be good peers,
1896           so reset their `numFails' weight to zero.  otherwise we connected
1897           to them fruitlessly, so mark it as another fail */
1898        for( i=0; i<nBad; ++i ) {
1899            tr_peer * peer = connections[i];
1900            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1901            if( peer->pieceDataActivityDate )
1902                atom->numFails = 0;
1903            else
1904                ++atom->numFails;
1905            tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
1906            removePeer( t, peer );
1907        }
1908
1909        /* add some new ones */
1910        for( i=0;    i < nCandidates
1911                  && i < MAX_RECONNECTIONS_PER_PULSE
1912                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1913        {
1914            tr_peerMgr * mgr = t->manager;
1915            struct peer_atom * atom = candidates[i];
1916            tr_peerIo * io;
1917
1918            tordbg( t, "Starting an OUTGOING connection with %s",
1919                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1920
1921            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1922            if( io == NULL )
1923            {
1924                atom->myflags |= MYFLAG_UNREACHABLE;
1925            }
1926            else
1927            {
1928                tr_handshake * handshake = tr_handshakeNew( io,
1929                                                            mgr->handle->encryptionMode,
1930                                                            myHandshakeDoneCB,
1931                                                            mgr );
1932
1933                assert( tr_peerIoGetTorrentHash( io ) );
1934
1935                ++newConnectionsThisSecond;
1936
1937                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1938            }
1939
1940            atom->time = time( NULL );
1941        }
1942
1943        /* cleanup */
1944        tr_free( connections );
1945        tr_free( candidates );
1946    }
1947
1948    torrentUnlock( t );
1949    return TRUE;
1950}
Note: See TracBrowser for help on using the repository browser.