source: trunk/libtransmission/peer-mgr.c @ 6163

Last change on this file since 6163 was 6163, checked in by charles, 14 years ago

tr_peerMgrHasConnections(): take webseeds into account, as suggested by BentMyWookie?

  • Property svn:keywords set to Date Rev Author Id
File size: 54.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6163 2008-06-12 16:28:39Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* how frequently to refill peers' request lists */
47    REFILL_PERIOD_MSEC = 666,
48
49    /* following the BT spec, we consider ourselves `snubbed' if
50     * we're we don't get piece data from a peer in this long */
51    SNUBBED_SEC = 60,
52
53    /* when many peers are available, keep idle ones this long */
54    MIN_UPLOAD_IDLE_SECS = (60 * 3),
55
56    /* when few peers are available, keep idle ones this long */
57    MAX_UPLOAD_IDLE_SECS = (60 * 10),
58
59    /* how frequently to decide which peers live and die */
60    RECONNECT_PERIOD_MSEC = (2 * 1000),
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 1,
64
65    /* max number of peers to ask for per second overall.
66     * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2
80};
81
82
83/**
84***
85**/
86
87/* We keep one of these for every peer we know about, whether
88 * it's connected or not, so the struct must be small.
89 * When our current connections underperform, we dip back
90 * into this list for new ones. */
91struct peer_atom
92{   
93    uint8_t from;
94    uint8_t flags; /* these match the added_f flags */
95    uint8_t myflags; /* flags that aren't defined in added_f */
96    uint16_t port;
97    uint16_t numFails;
98    struct in_addr addr;
99    time_t time;
100    time_t piece_data_time;
101};
102
103typedef struct
104{
105    uint8_t hash[SHA_DIGEST_LENGTH];
106    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
107    tr_ptrArray * pool; /* struct peer_atom */
108    tr_ptrArray * peers; /* tr_peer */
109    tr_ptrArray * webseeds; /* tr_webseed */
110    tr_timer * reconnectTimer;
111    tr_timer * rechokeTimer;
112    tr_timer * refillTimer;
113    tr_torrent * tor;
114    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
115    tr_bitfield * requestedPieces;
116
117    unsigned int isRunning : 1;
118
119    struct tr_peerMgr * manager;
120}
121Torrent;
122
123struct tr_peerMgr
124{
125    tr_handle * handle;
126    tr_ptrArray * torrents; /* Torrent */
127    tr_ptrArray * incomingHandshakes; /* tr_handshake */
128};
129
130#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
131
132/**
133***
134**/
135
136static void
137managerLock( const struct tr_peerMgr * manager )
138{
139    tr_globalLock( manager->handle );
140}
141static void
142managerUnlock( const struct tr_peerMgr * manager )
143{
144    tr_globalUnlock( manager->handle );
145}
146static void
147torrentLock( Torrent * torrent )
148{
149    managerLock( torrent->manager );
150}
151static void
152torrentUnlock( Torrent * torrent )
153{
154    managerUnlock( torrent->manager );
155}
156static int
157torrentIsLocked( const Torrent * t )
158{
159    return tr_globalIsLocked( t->manager->handle );
160}
161
162/**
163***
164**/
165
166static int
167compareAddresses( const struct in_addr * a, const struct in_addr * b )
168{
169    return tr_compareUint32( a->s_addr, b->s_addr );
170}
171
172static int
173handshakeCompareToAddr( const void * va, const void * vb )
174{
175    const tr_handshake * a = va;
176    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
177}
178
179static int
180handshakeCompare( const void * a, const void * b )
181{
182    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
183}
184
185static tr_handshake*
186getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
187{
188    return tr_ptrArrayFindSorted( handshakes,
189                                  in_addr,
190                                  handshakeCompareToAddr );
191}
192
193static int
194comparePeerAtomToAddress( const void * va, const void * vb )
195{
196    const struct peer_atom * a = va;
197    return compareAddresses( &a->addr, vb );
198}
199
200static int
201comparePeerAtoms( const void * va, const void * vb )
202{
203    const struct peer_atom * b = vb;
204    return comparePeerAtomToAddress( va, &b->addr );
205}
206
207/**
208***
209**/
210
211static int
212torrentCompare( const void * va, const void * vb )
213{
214    const Torrent * a = va;
215    const Torrent * b = vb;
216    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
217}
218
219static int
220torrentCompareToHash( const void * va, const void * vb )
221{
222    const Torrent * a = va;
223    const uint8_t * b_hash = vb;
224    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
225}
226
227static Torrent*
228getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
229{
230    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
231                                             hash,
232                                             torrentCompareToHash );
233}
234
235static int
236peerCompare( const void * va, const void * vb )
237{
238    const tr_peer * a = va;
239    const tr_peer * b = vb;
240    return compareAddresses( &a->in_addr, &b->in_addr );
241}
242
243static int
244peerCompareToAddr( const void * va, const void * vb )
245{
246    const tr_peer * a = va;
247    return compareAddresses( &a->in_addr, vb );
248}
249
250static tr_peer*
251getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
252{
253    assert( torrentIsLocked( torrent ) );
254    assert( in_addr != NULL );
255
256    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
257                                             in_addr,
258                                             peerCompareToAddr );
259}
260
261static struct peer_atom*
262getExistingAtom( const Torrent * t, const struct in_addr * addr )
263{
264    assert( torrentIsLocked( t ) );
265    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
266}
267
268static int
269peerIsInUse( const Torrent * ct, const struct in_addr * addr )
270{
271    Torrent * t = (Torrent*) ct;
272
273    assert( torrentIsLocked ( t ) );
274
275    return getExistingPeer( t, addr )
276        || getExistingHandshake( t->outgoingHandshakes, addr )
277        || getExistingHandshake( t->manager->incomingHandshakes, addr );
278}
279
280static tr_peer*
281peerConstructor( const struct in_addr * in_addr )
282{
283    tr_peer * p;
284    p = tr_new0( tr_peer, 1 );
285    p->rcToClient = tr_rcInit( );
286    p->rcToPeer = tr_rcInit( );
287    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
288    return p;
289}
290
291static tr_peer*
292getPeer( Torrent * torrent, const struct in_addr * in_addr )
293{
294    tr_peer * peer;
295
296    assert( torrentIsLocked( torrent ) );
297
298    peer = getExistingPeer( torrent, in_addr );
299
300    if( peer == NULL ) {
301        peer = peerConstructor( in_addr );
302        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
303    }
304
305    return peer;
306}
307
308static void
309peerDestructor( tr_peer * peer )
310{
311    assert( peer != NULL );
312    assert( peer->msgs != NULL );
313
314    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
315    tr_peerMsgsFree( peer->msgs );
316
317    tr_peerIoFree( peer->io );
318
319    tr_bitfieldFree( peer->have );
320    tr_bitfieldFree( peer->blame );
321    tr_rcClose( peer->rcToClient );
322    tr_rcClose( peer->rcToPeer );
323    tr_free( peer->client );
324    tr_free( peer );
325}
326
327static void
328removePeer( Torrent * t, tr_peer * peer )
329{
330    tr_peer * removed;
331    struct peer_atom * atom;
332
333    assert( torrentIsLocked( t ) );
334
335    atom = getExistingAtom( t, &peer->in_addr );
336    assert( atom != NULL );
337    atom->time = time( NULL );
338
339    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
340    assert( removed == peer );
341    peerDestructor( removed );
342}
343
344static void
345removeAllPeers( Torrent * t )
346{
347    while( !tr_ptrArrayEmpty( t->peers ) )
348        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
349}
350
351static void
352torrentDestructor( Torrent * t )
353{
354    uint8_t hash[SHA_DIGEST_LENGTH];
355
356    assert( t != NULL );
357    assert( !t->isRunning );
358    assert( t->peers != NULL );
359    assert( torrentIsLocked( t ) );
360    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
361    assert( tr_ptrArrayEmpty( t->peers ) );
362
363    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
364
365    tr_timerFree( &t->reconnectTimer );
366    tr_timerFree( &t->rechokeTimer );
367    tr_timerFree( &t->refillTimer );
368
369    tr_bitfieldFree( t->requestedPieces );
370    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
371    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
372    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
373    tr_ptrArrayFree( t->peers, NULL );
374
375    tr_free( t );
376}
377
378static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
379
380static Torrent*
381torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
382{
383    int i;
384    Torrent * t;
385
386    t = tr_new0( Torrent, 1 );
387    t->manager = manager;
388    t->tor = tor;
389    t->pool = tr_ptrArrayNew( );
390    t->peers = tr_ptrArrayNew( );
391    t->webseeds = tr_ptrArrayNew( );
392    t->outgoingHandshakes = tr_ptrArrayNew( );
393    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
394    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
395
396    for( i=0; i<tor->info.webseedCount; ++i ) {
397        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
398        tr_ptrArrayAppend( t->webseeds, w );
399    }
400
401    return t;
402}
403
404/**
405 * For explanation, see http://www.bittorrent.org/fast_extensions.html
406 * Also see the "test-allowed-set" unit test
407 */
408struct tr_bitfield *
409tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
410                              const uint32_t         sz,        /* number of pieces in torrent */
411                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
412                              const struct in_addr * ip )       /* peer's address */
413{
414    uint8_t w[SHA_DIGEST_LENGTH + 4];
415    uint8_t x[SHA_DIGEST_LENGTH];
416    tr_bitfield_t * a;
417    uint32_t a_size;
418
419    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
420    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
421    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
422
423    a = tr_bitfieldNew( sz );
424    a_size = 0;
425   
426    while( a_size < k )
427    {
428        int i;
429        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
430        {
431            uint32_t j = i * 4;                                /* (5) */
432            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
433            uint32_t index = y % sz;                           /* (7) */
434            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
435                tr_bitfieldAdd( a, index );                    /* (9) */
436                ++a_size;
437            }
438        }
439        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
440    }
441   
442    return a;
443}
444
445tr_peerMgr*
446tr_peerMgrNew( tr_handle * handle )
447{
448    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
449    m->handle = handle;
450    m->torrents = tr_ptrArrayNew( );
451    m->incomingHandshakes = tr_ptrArrayNew( );
452    return m;
453}
454
455void
456tr_peerMgrFree( tr_peerMgr * manager )
457{
458    managerLock( manager );
459
460    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
461     * the item from manager->handshakes, so this is a little roundabout... */
462    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
463        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
464    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
465
466    /* free the torrents. */
467    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
468
469    managerUnlock( manager );
470    tr_free( manager );
471}
472
473static tr_peer**
474getConnectedPeers( Torrent * t, int * setmeCount )
475{
476    int i, peerCount, connectionCount;
477    tr_peer **peers;
478    tr_peer **ret;
479
480    assert( torrentIsLocked( t ) );
481
482    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
483    ret = tr_new( tr_peer*, peerCount );
484
485    for( i=connectionCount=0; i<peerCount; ++i )
486        if( peers[i]->msgs != NULL )
487            ret[connectionCount++] = peers[i];
488
489    *setmeCount = connectionCount;
490    return ret;
491}
492
493static int
494clientIsDownloadingFrom( const tr_peer * peer )
495{
496    return peer->clientIsInterested && !peer->clientIsChoked;
497}
498
499static int
500clientIsUploadingTo( const tr_peer * peer )
501{
502    return peer->peerIsInterested && !peer->peerIsChoked;
503}
504
505/***
506****
507***/
508
509int
510tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
511                      const uint8_t          * torrentHash,
512                      const struct in_addr   * addr )
513{
514    int isSeed = FALSE;
515    const Torrent * t = NULL;
516    const struct peer_atom * atom = NULL;
517
518    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
519    if( t )
520        atom = getExistingAtom( t, addr );
521    if( atom )
522        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
523
524    return isSeed;
525}
526
527/***
528****  Refill
529***/
530
531struct tr_refill_piece
532{
533    tr_priority_t priority;
534    int missingBlockCount;
535    uint16_t random;
536    uint32_t piece;
537    uint32_t peerCount;
538};
539
540static int
541compareRefillPiece (const void * aIn, const void * bIn)
542{
543    const struct tr_refill_piece * a = aIn;
544    const struct tr_refill_piece * b = bIn;
545
546    /* if one piece has a higher priority, it goes first */
547    if( a->priority != b->priority )
548        return a->priority > b->priority ? -1 : 1;
549   
550    /* otherwise if one has fewer peers, it goes first */
551    if (a->peerCount != b->peerCount)
552        return a->peerCount < b->peerCount ? -1 : 1;
553
554    /* otherwise go with our random seed */
555    return tr_compareUint16( a->random, b->random );
556}
557
558static int
559isPieceInteresting( const tr_torrent  * tor,
560                    tr_piece_index_t    piece )
561{
562    if( tor->info.pieces[piece].dnd ) /* we don't want it */
563        return 0;
564
565    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
566        return 0;
567
568    return 1;
569}
570
571static uint32_t*
572getPreferredPieces( Torrent     * t,
573                    uint32_t    * pieceCount )
574{
575    const tr_torrent * tor = t->tor;
576    const tr_info * inf = &tor->info;
577    tr_piece_index_t i;
578    uint32_t poolSize = 0;
579    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
580    int peerCount;
581    tr_peer** peers;
582
583    assert( torrentIsLocked( t ) );
584
585    peers = getConnectedPeers( t, &peerCount );
586
587    for( i=0; i<inf->pieceCount; ++i )
588        if( isPieceInteresting( tor, i ) )
589            pool[poolSize++] = i;
590
591    /* sort the pool from most interesting to least... */
592    if( poolSize > 1 )
593    {
594        uint32_t j;
595        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
596
597        for( j=0; j<poolSize; ++j )
598        {
599            int k;
600            const int piece = pool[j];
601            struct tr_refill_piece * setme = p + j;
602
603            setme->piece = piece;
604            setme->priority = inf->pieces[piece].priority;
605            setme->peerCount = 0;
606            setme->random = tr_rand( UINT16_MAX );
607            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
608
609            for( k=0; k<peerCount; ++k ) {
610                const tr_peer * peer = peers[k];
611                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
612                    ++setme->peerCount;
613            }
614        }
615
616        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
617
618        for( j=0; j<poolSize; ++j )
619            pool[j] = p[j].piece;
620
621        tr_free( p );
622    }
623
624    tr_free( peers );
625
626    *pieceCount = poolSize;
627    return pool;
628}
629
630static tr_peer**
631getPeersUploadingToClient( Torrent * t, int * setmeCount )
632{
633    int i;
634    int peerCount = 0;
635    int retCount = 0;
636    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
637    tr_peer ** ret = tr_new( tr_peer*, peerCount );
638
639    /* get a list of peers we're downloading from */
640    for( i=0; i<peerCount; ++i )
641        if( clientIsDownloadingFrom( peers[i] ) )
642            ret[retCount++] = peers[i];
643
644    /* pick a different starting point each time so all peers
645     * get a chance at the first blocks in the queue */
646    if( retCount ) {
647        tr_peer ** tmp = tr_new( tr_peer*, retCount );
648        i = tr_rand( retCount );
649        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
650        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
651        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
652        tr_free( tmp );
653    }
654
655    *setmeCount = retCount;
656    return ret;
657}
658
659static int
660refillPulse( void * vtorrent )
661{
662    Torrent * t = vtorrent;
663    tr_torrent * tor = t->tor;
664    int peerCount;
665    int webseedCount;
666    tr_peer ** peers;
667    tr_webseed ** webseeds;
668    uint32_t pieceCount;
669    uint32_t * pieces;
670    tr_piece_index_t i;
671
672    if( !t->isRunning )
673        return TRUE;
674    if( tr_torrentIsSeed( t->tor ) )
675        return TRUE;
676
677    torrentLock( t );
678    tordbg( t, "Refilling Request Buffers..." );
679
680    pieces = getPreferredPieces( t, &pieceCount );
681    peers = getPeersUploadingToClient( t, &peerCount );
682    webseedCount = tr_ptrArraySize( t->webseeds );
683    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
684
685    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
686    {
687        int j;
688        int handled = FALSE;
689        const tr_piece_index_t piece = pieces[i];
690
691        assert( piece < tor->info.pieceSize );
692
693        /* find a peer who can ask for this piece */
694        for( j=0; !handled && j<peerCount; )
695        {
696            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
697            switch( val )
698            {
699                case TR_ADDREQ_FULL: 
700                case TR_ADDREQ_CLIENT_CHOKED:
701                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
702                    break;
703                case TR_ADDREQ_MISSING: 
704                case TR_ADDREQ_DUPLICATE: 
705                    ++j;
706                    break;
707                case TR_ADDREQ_OK:
708                    tr_bitfieldAdd( t->requestedPieces, piece );
709                    handled = TRUE;
710                    break;
711                default:
712                    assert( 0 && "unhandled value" );
713                    break;
714            }
715        }
716
717        /* maybe one of the webseeds can do it */
718        for( j=0; !handled && j<webseedCount; )
719        {
720            const int val = tr_webseedAddRequest( webseeds[j], piece );
721            switch( val )
722            {
723                case TR_ADDREQ_FULL: 
724                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
725                    break;
726                case TR_ADDREQ_OK:
727                    tr_bitfieldAdd( t->requestedPieces, piece );
728                    handled = TRUE;
729                    break;
730                default:
731                    assert( 0 && "unhandled value" );
732                    break;
733            }
734        }
735    }
736
737    /* cleanup */
738    tr_free( webseeds );
739    tr_free( peers );
740    tr_free( pieces );
741
742    t->refillTimer = NULL;
743    torrentUnlock( t );
744    return FALSE;
745}
746
747static void
748addStrike( Torrent * t, tr_peer * peer )
749{
750    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
751
752    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
753    {
754        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
755        atom->myflags |= MYFLAG_BANNED;
756        peer->doPurge = 1;
757        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
758    }
759}
760
761static void
762gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
763{
764    tr_torrent * tor = t->tor;
765    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
766    tor->corruptCur += byteCount;
767    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
768}
769
770static void
771refillSoon( Torrent * t )
772{
773    if( t->refillTimer == NULL )
774        t->refillTimer = tr_timerNew( t->manager->handle,
775                                      refillPulse, t,
776                                      REFILL_PERIOD_MSEC );
777}
778
779static void
780peerCallbackFunc( void * vpeer, void * vevent, void * vt )
781{
782    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
783    Torrent * t = (Torrent *) vt;
784    const tr_peer_event * e = vevent;
785
786    torrentLock( t );
787
788    switch( e->eventType )
789    {
790        case TR_PEER_NEED_REQ:
791            refillSoon( t );
792            break;
793
794        case TR_PEER_CANCEL:
795            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
796            break;
797
798        case TR_PEER_PEER_GOT_DATA: {
799            const time_t now = time( NULL );
800            tr_torrent * tor = t->tor;
801            tor->activityDate = now;
802            tor->uploadedCur += e->length;
803            tr_rcTransferred( tor->upload, e->length );
804            tr_rcTransferred( tor->handle->upload, e->length );
805            tr_statsAddUploaded( tor->handle, e->length );
806            if( peer ) {
807                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
808                atom->piece_data_time = time( NULL );
809            }
810            break;
811        }
812
813        case TR_PEER_CLIENT_GOT_DATA: {
814            const time_t now = time( NULL );
815            tr_torrent * tor = t->tor;
816            tor->activityDate = now;
817            /* only add this to downloadedCur if we got it from a peer --
818             * webseeds shouldn't count against our ratio.  As one tracker
819             * admin put it, "Those pieces are downloaded directly from the
820             * content distributor, not the peers, it is the tracker's job
821             * to manage the swarms, not the web server and does not fit
822             * into the jurisdiction of the tracker." */
823            if( peer )
824                tor->downloadedCur += e->length;
825            tr_rcTransferred( tor->download, e->length );
826            tr_rcTransferred( tor->handle->download, e->length );
827            tr_statsAddDownloaded( tor->handle, e->length );
828            if( peer ) {
829                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
830                atom->piece_data_time = time( NULL );
831            }
832            break;
833        }
834
835        case TR_PEER_PEER_PROGRESS: {
836            if( peer ) {
837                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
838                const int peerIsSeed = e->progress >= 1.0;
839                if( peerIsSeed ) {
840                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
841                    atom->flags |= ADDED_F_SEED_FLAG;
842                } else {
843                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
844                    atom->flags &= ~ADDED_F_SEED_FLAG;
845                }
846            }
847            break;
848        }
849
850        case TR_PEER_CLIENT_GOT_BLOCK:
851        {
852            tr_torrent * tor = t->tor;
853
854            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
855
856            tr_cpBlockAdd( tor->completion, block );
857
858            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
859            {
860                const tr_piece_index_t p = e->pieceIndex;
861                const tr_errno err = tr_ioTestPiece( tor, p );
862
863                if( err ) {
864                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
865                               (unsigned long)p, tr_errorString( err ) );
866                }
867
868                tr_torrentSetHasPiece( tor, p, !err );
869                tr_torrentSetPieceChecked( tor, p, TRUE );
870                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
871
872                if( err )
873                    gotBadPiece( t, p );
874
875                tr_torrentRecheckCompleteness( tor );
876            }
877            break;
878        }
879
880        case TR_PEER_ERROR:
881            if( TR_ERROR_IS_IO( e->err ) ) {
882                t->tor->error = e->err;
883                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
884                tr_torrentStop( t->tor );
885            } else if( e->err == TR_ERROR_ASSERT ) {
886                addStrike( t, peer );
887            }
888            peer->doPurge = 1;
889            break;
890
891        default:
892            assert(0);
893    }
894
895    torrentUnlock( t );
896}
897
898static void
899ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
900{
901    if( getExistingAtom( t, addr ) == NULL )
902    {
903        struct peer_atom * a;
904        a = tr_new0( struct peer_atom, 1 );
905        a->addr = *addr;
906        a->port = port;
907        a->flags = flags;
908        a->from = from;
909        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
910        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
911    }
912}
913
914static int
915getMaxPeerCount( const tr_torrent * tor UNUSED )
916{
917    return tor->maxConnectedPeers;
918}
919
920/* FIXME: this is kind of a mess. */
921static void
922myHandshakeDoneCB( tr_handshake    * handshake,
923                   tr_peerIo       * io,
924                   int               isConnected,
925                   const uint8_t   * peer_id,
926                   void            * vmanager )
927{
928    int ok = isConnected;
929    uint16_t port;
930    const struct in_addr * addr;
931    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
932    Torrent * t;
933    tr_handshake * ours;
934
935    assert( io != NULL );
936    assert( isConnected==0 || isConnected==1 );
937
938    t = tr_peerIoHasTorrentHash( io )
939        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
940        : NULL;
941
942    if( tr_peerIoIsIncoming ( io ) )
943        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
944                                        handshake, handshakeCompare );
945    else if( t != NULL )
946        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
947                                        handshake, handshakeCompare );
948    else
949        ours = handshake;
950
951    assert( ours != NULL );
952    assert( ours == handshake );
953
954    if( t != NULL )
955        torrentLock( t );
956
957    addr = tr_peerIoGetAddress( io, &port );
958
959    if( !ok || !t || !t->isRunning )
960    {
961        if( t ) {
962            struct peer_atom * atom = getExistingAtom( t, addr );
963            if( atom )
964                ++atom->numFails;
965        }
966
967        tr_peerIoFree( io );
968    }
969    else /* looking good */
970    {
971        struct peer_atom * atom;
972        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
973        atom = getExistingAtom( t, addr );
974
975        if( atom->myflags & MYFLAG_BANNED )
976        {
977            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
978            tr_peerIoFree( io );
979        }
980        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
981        {
982            tr_peerIoFree( io );
983        }
984        else
985        {
986            tr_peer * peer = getExistingPeer( t, addr );
987
988            if( peer != NULL ) /* we already have this peer */
989            {
990                tr_peerIoFree( io );
991            }
992            else
993            {
994                peer = getPeer( t, addr );
995                tr_free( peer->client );
996
997                if( !peer_id )
998                    peer->client = NULL;
999                else {
1000                    char client[128];
1001                    tr_clientForId( client, sizeof( client ), peer_id );
1002                    peer->client = tr_strdup( client );
1003                }
1004                peer->port = port;
1005                peer->io = io;
1006                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1007                atom->time = time( NULL );
1008            }
1009        }
1010    }
1011
1012    if( t != NULL )
1013        torrentUnlock( t );
1014}
1015
1016void
1017tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1018                       struct in_addr  * addr,
1019                       uint16_t          port,
1020                       int               socket )
1021{
1022    managerLock( manager );
1023
1024    if( tr_sessionIsAddressBlocked( manager->handle, addr ) )
1025    {
1026        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1027                inet_ntoa( *addr ) );
1028        tr_netClose( socket );
1029    }
1030    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1031    {
1032        tr_netClose( socket );
1033    }
1034    else /* we don't have a connetion to them yet... */
1035    {
1036        tr_peerIo * io;
1037        tr_handshake * handshake;
1038
1039        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1040
1041        handshake = tr_handshakeNew( io,
1042                                     manager->handle->encryptionMode,
1043                                     myHandshakeDoneCB,
1044                                     manager );
1045
1046        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1047    }
1048
1049    managerUnlock( manager );
1050}
1051
1052void
1053tr_peerMgrAddPex( tr_peerMgr     * manager,
1054                  const uint8_t  * torrentHash,
1055                  uint8_t          from,
1056                  const tr_pex   * pex )
1057{
1058    Torrent * t;
1059    managerLock( manager );
1060
1061    t = getExistingTorrent( manager, torrentHash );
1062    if( !tr_sessionIsAddressBlocked( t->manager->handle, &pex->in_addr ) )
1063        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1064
1065    managerUnlock( manager );
1066}
1067
1068tr_pex *
1069tr_peerMgrCompactToPex( const void  * compact,
1070                        size_t        compactLen,
1071                        const char  * added_f,
1072                        size_t      * pexCount )
1073{
1074    size_t i;
1075    size_t n = compactLen / 6;
1076    const uint8_t * walk = compact;
1077    tr_pex * pex = tr_new0( tr_pex, n );
1078    for( i=0; i<n; ++i ) {
1079        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1080        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1081        if( added_f )
1082            pex[i].flags = added_f[i];
1083    }
1084    *pexCount = n;
1085    return pex;
1086}
1087
1088/**
1089***
1090**/
1091
1092void
1093tr_peerMgrSetBlame( tr_peerMgr     * manager,
1094                    const uint8_t  * torrentHash,
1095                    int              pieceIndex,
1096                    int              success )
1097{
1098    if( !success )
1099    {
1100        int peerCount, i;
1101        Torrent * t = getExistingTorrent( manager, torrentHash );
1102        tr_peer ** peers;
1103
1104        assert( torrentIsLocked( t ) );
1105
1106        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1107        for( i=0; i<peerCount; ++i )
1108        {
1109            tr_peer * peer = peers[i];
1110            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1111            {
1112                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1113                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1114                           pieceIndex, (int)peer->strikes+1 );
1115                addStrike( t, peer );
1116            }
1117        }
1118    }
1119}
1120
1121int
1122tr_pexCompare( const void * va, const void * vb )
1123{
1124    const tr_pex * a = (const tr_pex *) va;
1125    const tr_pex * b = (const tr_pex *) vb;
1126    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1127    if( i ) return i;
1128    if( a->port < b->port ) return -1;
1129    if( a->port > b->port ) return 1;
1130    return 0;
1131}
1132
1133int tr_pexCompare( const void * a, const void * b );
1134
1135static int
1136peerPrefersCrypto( const tr_peer * peer )
1137{
1138    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1139        return TRUE;
1140
1141    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1142        return FALSE;
1143
1144    return tr_peerIoIsEncrypted( peer->io );
1145};
1146
1147int
1148tr_peerMgrGetPeers( tr_peerMgr      * manager,
1149                    const uint8_t   * torrentHash,
1150                    tr_pex         ** setme_pex )
1151{
1152    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1153    int i, peerCount;
1154    const tr_peer ** peers;
1155    tr_pex * pex;
1156    tr_pex * walk;
1157
1158    managerLock( manager );
1159
1160    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1161    pex = walk = tr_new( tr_pex, peerCount );
1162
1163    for( i=0; i<peerCount; ++i, ++walk )
1164    {
1165        const tr_peer * peer = peers[i];
1166
1167        walk->in_addr = peer->in_addr;
1168
1169        walk->port = peer->port;
1170
1171        walk->flags = 0;
1172        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1173        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1174    }
1175
1176    assert( ( walk - pex ) == peerCount );
1177    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1178    *setme_pex = pex;
1179
1180    managerUnlock( manager );
1181
1182    return peerCount;
1183}
1184
1185static int reconnectPulse( void * vtorrent );
1186static int rechokePulse( void * vtorrent );
1187
1188void
1189tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1190                        const uint8_t  * torrentHash )
1191{
1192    Torrent * t;
1193
1194    managerLock( manager );
1195
1196    t = getExistingTorrent( manager, torrentHash );
1197
1198    assert( t != NULL );
1199    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1200    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1201
1202    if( !t->isRunning )
1203    {
1204        t->isRunning = 1;
1205
1206        t->reconnectTimer = tr_timerNew( t->manager->handle,
1207                                         reconnectPulse, t,
1208                                         RECONNECT_PERIOD_MSEC );
1209
1210        t->rechokeTimer = tr_timerNew( t->manager->handle,
1211                                       rechokePulse, t,
1212                                       RECHOKE_PERIOD_MSEC );
1213
1214        reconnectPulse( t );
1215
1216        rechokePulse( t );
1217
1218        if( !tr_ptrArrayEmpty( t->webseeds ) )
1219            refillSoon( t );
1220    }
1221
1222    managerUnlock( manager );
1223}
1224
1225static void
1226stopTorrent( Torrent * t )
1227{
1228    assert( torrentIsLocked( t ) );
1229
1230    t->isRunning = 0;
1231    tr_timerFree( &t->rechokeTimer );
1232    tr_timerFree( &t->reconnectTimer );
1233
1234    /* disconnect the peers. */
1235    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1236    tr_ptrArrayClear( t->peers );
1237
1238    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1239     * which removes the handshake from t->outgoingHandshakes... */
1240    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1241        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1242}
1243void
1244tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1245                       const uint8_t  * torrentHash)
1246{
1247    managerLock( manager );
1248
1249    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1250
1251    managerUnlock( manager );
1252}
1253
1254void
1255tr_peerMgrAddTorrent( tr_peerMgr * manager,
1256                      tr_torrent * tor )
1257{
1258    Torrent * t;
1259
1260    managerLock( manager );
1261
1262    assert( tor != NULL );
1263    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1264
1265    t = torrentConstructor( manager, tor );
1266    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1267
1268    managerUnlock( manager );
1269}
1270
1271void
1272tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1273                         const uint8_t  * torrentHash )
1274{
1275    Torrent * t;
1276
1277    managerLock( manager );
1278
1279    t = getExistingTorrent( manager, torrentHash );
1280    assert( t != NULL );
1281    stopTorrent( t );
1282    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1283    torrentDestructor( t );
1284
1285    managerUnlock( manager );
1286}
1287
1288void
1289tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1290                               const uint8_t    * torrentHash,
1291                               int8_t           * tab,
1292                               int                tabCount )
1293{
1294    int i;
1295    const Torrent * t;
1296    const tr_torrent * tor;
1297    float interval;
1298    int isComplete;
1299    int peerCount;
1300    const tr_peer ** peers;
1301
1302    managerLock( manager );
1303
1304    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1305    tor = t->tor;
1306    interval = tor->info.pieceCount / (float)tabCount;
1307    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1308    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1309
1310    memset( tab, 0, tabCount );
1311
1312    for( i=0; tor && i<tabCount; ++i )
1313    {
1314        const int piece = i * interval;
1315
1316        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1317            tab[i] = -1;
1318        else if( peerCount ) {
1319            int j;
1320            for( j=0; j<peerCount; ++j )
1321                if( tr_bitfieldHas( peers[j]->have, i ) )
1322                    ++tab[i];
1323        }
1324    }
1325
1326    managerUnlock( manager );
1327}
1328
1329/* Returns the pieces that are available from peers */
1330tr_bitfield*
1331tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1332                        const uint8_t    * torrentHash )
1333{
1334    int i, size;
1335    Torrent * t;
1336    tr_peer ** peers;
1337    tr_bitfield * pieces;
1338    managerLock( manager );
1339
1340    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1341    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1342    peers = getConnectedPeers( t, &size );
1343    for( i=0; i<size; ++i )
1344        tr_bitfieldOr( pieces, peers[i]->have );
1345
1346    managerUnlock( manager );
1347    tr_free( peers );
1348    return pieces;
1349}
1350
1351int
1352tr_peerMgrHasConnections( const tr_peerMgr * manager,
1353                          const uint8_t    * torrentHash )
1354{
1355    int ret;
1356    const Torrent * t;
1357    managerLock( manager );
1358
1359    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1360    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1361
1362    managerUnlock( manager );
1363    return ret;
1364}
1365
1366void
1367tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1368                        const uint8_t    * torrentHash,
1369                        int              * setmePeersKnown,
1370                        int              * setmePeersConnected,
1371                        int              * setmeSeedsConnected,
1372                        int              * setmeWebseedsSendingToUs,
1373                        int              * setmePeersSendingToUs,
1374                        int              * setmePeersGettingFromUs,
1375                        int              * setmePeersFrom )
1376{
1377    int i, size;
1378    const Torrent * t;
1379    const tr_peer ** peers;
1380    const tr_webseed ** webseeds;
1381
1382    managerLock( manager );
1383
1384    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1385    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1386
1387    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1388    *setmePeersConnected       = 0;
1389    *setmeSeedsConnected       = 0;
1390    *setmePeersGettingFromUs   = 0;
1391    *setmePeersSendingToUs     = 0;
1392    *setmeWebseedsSendingToUs  = 0;
1393
1394    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1395        setmePeersFrom[i] = 0;
1396
1397    for( i=0; i<size; ++i )
1398    {
1399        const tr_peer * peer = peers[i];
1400        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1401
1402        if( peer->io == NULL ) /* not connected */
1403            continue;
1404
1405        ++*setmePeersConnected;
1406
1407        ++setmePeersFrom[atom->from];
1408
1409        if( clientIsDownloadingFrom( peer ) )
1410            ++*setmePeersSendingToUs;
1411
1412        if( clientIsUploadingTo( peer ) )
1413            ++*setmePeersGettingFromUs;
1414
1415        if( atom->flags & ADDED_F_SEED_FLAG )
1416            ++*setmeSeedsConnected;
1417    }
1418
1419    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1420    for( i=0; i<size; ++i )
1421    {
1422        if( tr_webseedIsActive( webseeds[i] ) )
1423            ++*setmeWebseedsSendingToUs;
1424    }
1425
1426    managerUnlock( manager );
1427}
1428
1429float*
1430tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1431                     const uint8_t    * torrentHash )
1432{
1433    const Torrent * t;
1434    const tr_webseed ** webseeds;
1435    int i;
1436    int webseedCount;
1437    float * ret;
1438
1439    assert( manager );
1440    managerLock( manager );
1441
1442    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1443    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds, &webseedCount );
1444    assert( webseedCount == t->tor->info.webseedCount );
1445    ret = tr_new0( float, webseedCount );
1446
1447    for( i=0; i<webseedCount; ++i )
1448        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1449            ret[i] = -1.0;
1450
1451    managerUnlock( manager );
1452    return ret;
1453}
1454
1455struct tr_peer_stat *
1456tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1457                     const uint8_t     * torrentHash,
1458                     int               * setmeCount UNUSED )
1459{
1460    int i, size;
1461    const Torrent * t;
1462    tr_peer ** peers;
1463    tr_peer_stat * ret;
1464
1465    assert( manager );
1466    managerLock( manager );
1467
1468    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1469    peers = getConnectedPeers( (Torrent*)t, &size );
1470    ret = tr_new0( tr_peer_stat, size );
1471
1472    for( i=0; i<size; ++i )
1473    {
1474        char * pch;
1475        const tr_peer * peer = peers[i];
1476        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1477        tr_peer_stat * stat = ret + i;
1478
1479        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1480        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1481        stat->port               = peer->port;
1482        stat->from               = atom->from;
1483        stat->progress           = peer->progress;
1484        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1485        stat->uploadToRate       = peer->rateToPeer;
1486        stat->downloadFromRate   = peer->rateToClient;
1487        stat->peerIsChoked       = peer->peerIsChoked;
1488        stat->peerIsInterested   = peer->peerIsInterested;
1489        stat->clientIsChoked     = peer->clientIsChoked;
1490        stat->clientIsInterested = peer->clientIsInterested;
1491        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1492        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1493        stat->isUploadingTo      = clientIsUploadingTo( peer );
1494
1495        pch = stat->flagStr;
1496        if( t->optimistic == peer ) *pch++ = 'O';
1497        if( stat->isDownloadingFrom ) *pch++ = 'D';
1498        else if( stat->clientIsInterested ) *pch++ = 'd';
1499        if( stat->isUploadingTo ) *pch++ = 'U';
1500        else if( stat->peerIsInterested ) *pch++ = 'u';
1501        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1502        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1503        if( stat->isEncrypted ) *pch++ = 'E';
1504        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1505        if( stat->isIncoming ) *pch++ = 'I';
1506        *pch = '\0';
1507    }
1508
1509    *setmeCount = size;
1510    tr_free( peers );
1511
1512    managerUnlock( manager );
1513    return ret;
1514}
1515
1516/**
1517***
1518**/
1519
1520struct ChokeData
1521{
1522    uint8_t doUnchoke;
1523    uint8_t isInterested;
1524    uint32_t rate;
1525    tr_peer * peer;
1526};
1527
1528static int
1529compareChoke( const void * va, const void * vb )
1530{
1531    const struct ChokeData * a = va;
1532    const struct ChokeData * b = vb;
1533    return -tr_compareUint32( a->rate, b->rate );
1534}
1535
1536static int
1537isNew( const tr_peer * peer )
1538{
1539    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1540}
1541
1542static int
1543isSame( const tr_peer * peer )
1544{
1545    return peer && peer->client && strstr( peer->client, "Transmission" );
1546}
1547
1548/**
1549***
1550**/
1551
1552static int
1553getWeightedRate( const tr_peer * peer, int clientIsSeed )
1554{
1555    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1556                                        : peer->rateToClient ) );
1557}
1558
1559static void
1560rechoke( Torrent * t )
1561{
1562    int i, peerCount, size, unchokedInterested;
1563    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1564    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1565    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1566
1567    assert( torrentIsLocked( t ) );
1568   
1569    /* sort the peers by preference and rate */
1570    for( i=0, size=0; i<peerCount; ++i )
1571    {
1572        tr_peer * peer = peers[i];
1573        if( peer->progress >= 1.0 ) /* choke all seeds */
1574            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1575        else {
1576            struct ChokeData * node = &choke[size++];
1577            node->peer = peer;
1578            node->isInterested = peer->peerIsInterested;
1579            node->rate = getWeightedRate( peer, clientIsSeed );
1580        }
1581    }
1582
1583    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1584
1585    /**
1586     * Reciprocation and number of uploads capping is managed by unchoking
1587     * the N peers which have the best upload rate and are interested.
1588     * This maximizes the client's download rate. These N peers are
1589     * referred to as downloaders, because they are interested in downloading
1590     * from the client.
1591     *
1592     * Peers which have a better upload rate (as compared to the downloaders)
1593     * but aren't interested get unchoked. If they become interested, the
1594     * downloader with the worst upload rate gets choked. If a client has
1595     * a complete file, it uses its upload rate rather than its download
1596     * rate to decide which peers to unchoke.
1597     */
1598    unchokedInterested = 0;
1599    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1600        choke[i].doUnchoke = 1;
1601        if( choke[i].isInterested )
1602            ++unchokedInterested;
1603    }
1604
1605    /* optimistic unchoke */
1606    if( i < size )
1607    {
1608        int n;
1609        struct ChokeData * c;
1610        tr_ptrArray * randPool = tr_ptrArrayNew( );
1611
1612        for( ; i<size; ++i )
1613        {
1614            if( choke[i].isInterested )
1615            {
1616                const tr_peer * peer = choke[i].peer;
1617                int x=1, y;
1618                if( isNew( peer ) ) x *= 3;
1619                if( isSame( peer ) ) x *= 3;
1620                for( y=0; y<x; ++y )
1621                    tr_ptrArrayAppend( randPool, choke );
1622            }
1623        }
1624
1625        if(( n = tr_ptrArraySize( randPool )))
1626        {
1627            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1628            c->doUnchoke = 1;
1629            t->optimistic = c->peer;
1630        }
1631
1632        tr_ptrArrayFree( randPool, NULL );
1633    }
1634
1635    for( i=0; i<size; ++i )
1636        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1637
1638    /* cleanup */
1639    tr_free( choke );
1640    tr_free( peers );
1641}
1642
1643static int
1644rechokePulse( void * vtorrent )
1645{
1646    Torrent * t = vtorrent;
1647    torrentLock( t );
1648    rechoke( t );
1649    torrentUnlock( t );
1650    return TRUE;
1651}
1652
1653/***
1654****
1655****  Life and Death
1656****
1657***/
1658
1659static int
1660shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1661{
1662    const tr_torrent * tor = t->tor;
1663    const time_t now = time( NULL );
1664    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1665
1666    /* if it's marked for purging, close it */
1667    if( peer->doPurge ) {
1668        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1669        return TRUE;
1670    }
1671
1672    /* if we're seeding and the peer has everything we have,
1673     * and enough time has passed for a pex exchange, then disconnect */
1674    if( tr_torrentIsSeed( tor ) ) {
1675        int peerHasEverything;
1676        if( atom->flags & ADDED_F_SEED_FLAG )
1677            peerHasEverything = TRUE;
1678        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1679            peerHasEverything = FALSE;
1680        else {
1681            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1682            tr_bitfieldDifference( tmp, peer->have );
1683            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1684            tr_bitfieldFree( tmp );
1685        }
1686        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1687            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1688            return TRUE;
1689        }
1690    }
1691
1692    /* disconnect if it's been too long since piece data has been transferred.
1693     * this is on a sliding scale based on number of available peers... */
1694    {
1695        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1696        /* if we have >= relaxIfFewerThan, strictness is 100%.
1697         * if we have zero connections, strictness is 0% */
1698        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1699            ? 1.0
1700            : peerCount / (float)relaxStrictnessIfFewerThanN;
1701        const int lo = MIN_UPLOAD_IDLE_SECS;
1702        const int hi = MAX_UPLOAD_IDLE_SECS;
1703        const int limit = lo + ((hi-lo) * strictness);
1704        const time_t then = peer->pieceDataActivityDate;
1705        const int idleTime = then ? (now-then) : 0;
1706        if( idleTime > limit ) {
1707            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1708                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1709            return TRUE;
1710        }
1711    }
1712
1713    return FALSE;
1714}
1715
1716static tr_peer **
1717getPeersToClose( Torrent * t, int * setmeSize )
1718{
1719    int i, peerCount, outsize;
1720    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1721    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1722
1723    assert( torrentIsLocked( t ) );
1724
1725    for( i=outsize=0; i<peerCount; ++i )
1726        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1727            ret[outsize++] = peers[i];
1728
1729    *setmeSize = outsize;
1730    return ret;
1731}
1732
1733static int
1734compareCandidates( const void * va, const void * vb )
1735{
1736    const struct peer_atom * a = * (const struct peer_atom**) va;
1737    const struct peer_atom * b = * (const struct peer_atom**) vb;
1738    int i;
1739
1740    if( a->piece_data_time > b->piece_data_time ) return -1;
1741    if( a->piece_data_time < b->piece_data_time ) return  1;
1742
1743    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1744        return i;
1745
1746    if( a->time != b->time )
1747        return a->time < b->time ? -1 : 1;
1748
1749    return 0;
1750}
1751
1752static struct peer_atom **
1753getPeerCandidates( Torrent * t, int * setmeSize )
1754{
1755    int i, atomCount, retCount;
1756    struct peer_atom ** atoms;
1757    struct peer_atom ** ret;
1758    const time_t now = time( NULL );
1759    const int seed = tr_torrentIsSeed( t->tor );
1760
1761    assert( torrentIsLocked( t ) );
1762
1763    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1764    ret = tr_new( struct peer_atom*, atomCount );
1765    for( i=retCount=0; i<atomCount; ++i )
1766    {
1767        struct peer_atom * atom = atoms[i];
1768
1769        /* peer fed us too much bad data ... we only keep it around
1770         * now to weed it out in case someone sends it to us via pex */
1771        if( atom->myflags & MYFLAG_BANNED )
1772            continue;
1773
1774        /* peer was unconnectable before, so we're not going to keep trying.
1775         * this is needs a separate flag from `banned', since if they try
1776         * to connect to us later, we'll let them in */
1777        if( atom->myflags & MYFLAG_UNREACHABLE )
1778            continue;
1779
1780        /* we don't need two connections to the same peer... */
1781        if( peerIsInUse( t, &atom->addr ) )
1782            continue;
1783
1784        /* no need to connect if we're both seeds... */
1785        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1786            continue;
1787
1788        /* we're wasting our time trying to connect to this bozo. */
1789        if( atom->numFails > 3 )
1790            continue;
1791
1792        /* If we were connected to this peer recently and transferring
1793         * piece data, try to reconnect -- network troubles may have
1794         * disconnected us.  but if we weren't sharing piece data,
1795         * hold off on this peer to give another one a try instead */
1796        if( ( now - atom->piece_data_time ) > 30 )
1797        {
1798            int minWait = (60 * 10); /* ten minutes */
1799            int maxWait = (60 * 30); /* thirty minutes */
1800            int wait = atom->numFails * minWait;
1801            if( wait < minWait ) wait = minWait;
1802            if( wait > maxWait ) wait = maxWait;
1803            if( ( now - atom->time ) < wait ) {
1804                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1805                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1806                continue;
1807            }
1808        }
1809
1810        /* Don't connect to peers in our blocklist */
1811        if( tr_sessionIsAddressBlocked( t->manager->handle, &atom->addr ) )
1812            continue;
1813
1814        ret[retCount++] = atom;
1815    }
1816
1817    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1818    *setmeSize = retCount;
1819    return ret;
1820}
1821
1822static int
1823reconnectPulse( void * vtorrent )
1824{
1825    Torrent * t = vtorrent;
1826    static time_t prevTime = 0;
1827    static int newConnectionsThisSecond = 0;
1828    time_t now;
1829
1830    torrentLock( t );
1831
1832    now = time( NULL );
1833    if( prevTime != now )
1834    {
1835        prevTime = now;
1836        newConnectionsThisSecond = 0;
1837    }
1838
1839    if( !t->isRunning )
1840    {
1841        removeAllPeers( t );
1842    }
1843    else
1844    {
1845        int i, nCandidates, nBad;
1846        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1847        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1848
1849        if( nBad || nCandidates )
1850            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1851                       "%d connection candidates, %d atoms, max per pulse is %d",
1852                       t->tor->info.name, nBad, nCandidates,
1853                       tr_ptrArraySize(t->pool),
1854                       (int)MAX_RECONNECTIONS_PER_PULSE );
1855
1856        /* disconnect some peers.
1857           if we got transferred piece data, then they might be good peers,
1858           so reset their `numFails' weight to zero.  otherwise we connected
1859           to them fruitlessly, so mark it as another fail */
1860        for( i=0; i<nBad; ++i ) {
1861            tr_peer * peer = connections[i];
1862            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1863            if( peer->pieceDataActivityDate )
1864                atom->numFails = 0;
1865            else
1866                ++atom->numFails;
1867            removePeer( t, peer );
1868        }
1869
1870        /* add some new ones */
1871        for( i=0;    i < nCandidates
1872                  && i < MAX_RECONNECTIONS_PER_PULSE
1873                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1874        {
1875            tr_peerMgr * mgr = t->manager;
1876            struct peer_atom * atom = candidates[i];
1877            tr_peerIo * io;
1878
1879            tordbg( t, "Starting an OUTGOING connection with %s",
1880                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1881
1882            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1883            if( io == NULL )
1884            {
1885                atom->myflags |= MYFLAG_UNREACHABLE;
1886            }
1887            else
1888            {
1889                tr_handshake * handshake = tr_handshakeNew( io,
1890                                                            mgr->handle->encryptionMode,
1891                                                            myHandshakeDoneCB,
1892                                                            mgr );
1893
1894                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1895
1896                ++newConnectionsThisSecond;
1897
1898                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1899            }
1900
1901            atom->time = time( NULL );
1902        }
1903
1904        /* cleanup */
1905        tr_free( connections );
1906        tr_free( candidates );
1907    }
1908
1909    torrentUnlock( t );
1910    return TRUE;
1911}
Note: See TracBrowser for help on using the repository browser.