source: trunk/libtransmission/peer-mgr.c @ 6103

Last change on this file since 6103 was 6103, checked in by charles, 14 years ago

(libT) make tr_torrentAvailability() a lot faster in the case where the torrent is complete, and a little faster in the genral case too.

  • Property svn:keywords set to Date Rev Author Id
File size: 53.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6103 2008-06-09 23:11:15Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* how frequently to refill peers' request lists */
47    REFILL_PERIOD_MSEC = 666,
48
49    /* following the BT spec, we consider ourselves `snubbed' if
50     * we're we don't get piece data from a peer in this long */
51    SNUBBED_SEC = 60,
52
53    /* when many peers are available, keep idle ones this long */
54    MIN_UPLOAD_IDLE_SECS = (60 * 3),
55
56    /* when few peers are available, keep idle ones this long */
57    MAX_UPLOAD_IDLE_SECS = (60 * 10),
58
59    /* how frequently to decide which peers live and die */
60    RECONNECT_PERIOD_MSEC = (2 * 1000),
61
62    /* max # of peers to ask fer per torrent per reconnect pulse */
63    MAX_RECONNECTIONS_PER_PULSE = 1,
64
65    /* max number of peers to ask for per second overall.
66     * this throttle is to avoid overloading the router */
67    MAX_CONNECTIONS_PER_SECOND = 8,
68
69    /* number of unchoked peers per torrent */
70    MAX_UNCHOKED_PEERS = 12,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 3,
74
75    /* use for bitwise operations w/peer_atom.myflags */
76    MYFLAG_BANNED = 1,
77
78    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
79    MYFLAG_UNREACHABLE = 2
80};
81
82
83/**
84***
85**/
86
87/* We keep one of these for every peer we know about, whether
88 * it's connected or not, so the struct must be small.
89 * When our current connections underperform, we dip back
90 * into this list for new ones. */
91struct peer_atom
92{   
93    uint8_t from;
94    uint8_t flags; /* these match the added_f flags */
95    uint8_t myflags; /* flags that aren't defined in added_f */
96    uint16_t port;
97    uint16_t numFails;
98    struct in_addr addr;
99    time_t time;
100    time_t piece_data_time;
101};
102
103typedef struct
104{
105    uint8_t hash[SHA_DIGEST_LENGTH];
106    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
107    tr_ptrArray * pool; /* struct peer_atom */
108    tr_ptrArray * peers; /* tr_peer */
109    tr_ptrArray * webseeds; /* tr_webseed */
110    tr_timer * reconnectTimer;
111    tr_timer * rechokeTimer;
112    tr_timer * refillTimer;
113    tr_torrent * tor;
114    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
115    tr_bitfield * requestedPieces;
116
117    unsigned int isRunning : 1;
118
119    struct tr_peerMgr * manager;
120}
121Torrent;
122
123struct tr_peerMgr
124{
125    tr_handle * handle;
126    tr_ptrArray * torrents; /* Torrent */
127    tr_ptrArray * incomingHandshakes; /* tr_handshake */
128};
129
130#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
131
132/**
133***
134**/
135
136static void
137managerLock( const struct tr_peerMgr * manager )
138{
139    tr_globalLock( manager->handle );
140}
141static void
142managerUnlock( const struct tr_peerMgr * manager )
143{
144    tr_globalUnlock( manager->handle );
145}
146static void
147torrentLock( Torrent * torrent )
148{
149    managerLock( torrent->manager );
150}
151static void
152torrentUnlock( Torrent * torrent )
153{
154    managerUnlock( torrent->manager );
155}
156static int
157torrentIsLocked( const Torrent * t )
158{
159    return tr_globalIsLocked( t->manager->handle );
160}
161
162/**
163***
164**/
165
166static int
167compareAddresses( const struct in_addr * a, const struct in_addr * b )
168{
169    return tr_compareUint32( a->s_addr, b->s_addr );
170}
171
172static int
173handshakeCompareToAddr( const void * va, const void * vb )
174{
175    const tr_handshake * a = va;
176    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
177}
178
179static int
180handshakeCompare( const void * a, const void * b )
181{
182    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
183}
184
185static tr_handshake*
186getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
187{
188    return tr_ptrArrayFindSorted( handshakes,
189                                  in_addr,
190                                  handshakeCompareToAddr );
191}
192
193static int
194comparePeerAtomToAddress( const void * va, const void * vb )
195{
196    const struct peer_atom * a = va;
197    return compareAddresses( &a->addr, vb );
198}
199
200static int
201comparePeerAtoms( const void * va, const void * vb )
202{
203    const struct peer_atom * b = vb;
204    return comparePeerAtomToAddress( va, &b->addr );
205}
206
207/**
208***
209**/
210
211static int
212torrentCompare( const void * va, const void * vb )
213{
214    const Torrent * a = va;
215    const Torrent * b = vb;
216    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
217}
218
219static int
220torrentCompareToHash( const void * va, const void * vb )
221{
222    const Torrent * a = va;
223    const uint8_t * b_hash = vb;
224    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
225}
226
227static Torrent*
228getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
229{
230    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
231                                             hash,
232                                             torrentCompareToHash );
233}
234
235static int
236peerCompare( const void * va, const void * vb )
237{
238    const tr_peer * a = va;
239    const tr_peer * b = vb;
240    return compareAddresses( &a->in_addr, &b->in_addr );
241}
242
243static int
244peerCompareToAddr( const void * va, const void * vb )
245{
246    const tr_peer * a = va;
247    return compareAddresses( &a->in_addr, vb );
248}
249
250static tr_peer*
251getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
252{
253    assert( torrentIsLocked( torrent ) );
254    assert( in_addr != NULL );
255
256    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
257                                             in_addr,
258                                             peerCompareToAddr );
259}
260
261static struct peer_atom*
262getExistingAtom( const Torrent * t, const struct in_addr * addr )
263{
264    assert( torrentIsLocked( t ) );
265    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
266}
267
268static int
269peerIsInUse( const Torrent * ct, const struct in_addr * addr )
270{
271    Torrent * t = (Torrent*) ct;
272
273    assert( torrentIsLocked ( t ) );
274
275    return getExistingPeer( t, addr )
276        || getExistingHandshake( t->outgoingHandshakes, addr )
277        || getExistingHandshake( t->manager->incomingHandshakes, addr );
278}
279
280static tr_peer*
281peerConstructor( const struct in_addr * in_addr )
282{
283    tr_peer * p;
284    p = tr_new0( tr_peer, 1 );
285    p->rcToClient = tr_rcInit( );
286    p->rcToPeer = tr_rcInit( );
287    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
288    return p;
289}
290
291static tr_peer*
292getPeer( Torrent * torrent, const struct in_addr * in_addr )
293{
294    tr_peer * peer;
295
296    assert( torrentIsLocked( torrent ) );
297
298    peer = getExistingPeer( torrent, in_addr );
299
300    if( peer == NULL ) {
301        peer = peerConstructor( in_addr );
302        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
303    }
304
305    return peer;
306}
307
308static void
309peerDestructor( tr_peer * peer )
310{
311    assert( peer != NULL );
312    assert( peer->msgs != NULL );
313
314    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
315    tr_peerMsgsFree( peer->msgs );
316
317    tr_peerIoFree( peer->io );
318
319    tr_bitfieldFree( peer->have );
320    tr_bitfieldFree( peer->blame );
321    tr_rcClose( peer->rcToClient );
322    tr_rcClose( peer->rcToPeer );
323    tr_free( peer->client );
324    tr_free( peer );
325}
326
327static void
328removePeer( Torrent * t, tr_peer * peer )
329{
330    tr_peer * removed;
331    struct peer_atom * atom;
332
333    assert( torrentIsLocked( t ) );
334
335    atom = getExistingAtom( t, &peer->in_addr );
336    assert( atom != NULL );
337    atom->time = time( NULL );
338
339    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
340    assert( removed == peer );
341    peerDestructor( removed );
342}
343
344static void
345removeAllPeers( Torrent * t )
346{
347    while( !tr_ptrArrayEmpty( t->peers ) )
348        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
349}
350
351static void
352torrentDestructor( Torrent * t )
353{
354    uint8_t hash[SHA_DIGEST_LENGTH];
355
356    assert( t != NULL );
357    assert( !t->isRunning );
358    assert( t->peers != NULL );
359    assert( torrentIsLocked( t ) );
360    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
361    assert( tr_ptrArrayEmpty( t->peers ) );
362
363    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
364
365    tr_timerFree( &t->reconnectTimer );
366    tr_timerFree( &t->rechokeTimer );
367    tr_timerFree( &t->refillTimer );
368
369    tr_bitfieldFree( t->requestedPieces );
370    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
371    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
372    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
373    tr_ptrArrayFree( t->peers, NULL );
374
375    tr_free( t );
376}
377
378static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
379
380static Torrent*
381torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
382{
383    int i;
384    Torrent * t;
385
386    t = tr_new0( Torrent, 1 );
387    t->manager = manager;
388    t->tor = tor;
389    t->pool = tr_ptrArrayNew( );
390    t->peers = tr_ptrArrayNew( );
391    t->webseeds = tr_ptrArrayNew( );
392    t->outgoingHandshakes = tr_ptrArrayNew( );
393    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
394    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
395
396    for( i=0; i<tor->info.webseedCount; ++i ) {
397        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
398        tr_ptrArrayAppend( t->webseeds, w );
399    }
400
401    return t;
402}
403
404/**
405 * For explanation, see http://www.bittorrent.org/fast_extensions.html
406 * Also see the "test-allowed-set" unit test
407 */
408struct tr_bitfield *
409tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
410                              const uint32_t         sz,        /* number of pieces in torrent */
411                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
412                              const struct in_addr * ip )       /* peer's address */
413{
414    uint8_t w[SHA_DIGEST_LENGTH + 4];
415    uint8_t x[SHA_DIGEST_LENGTH];
416    tr_bitfield_t * a;
417    uint32_t a_size;
418
419    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
420    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
421    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
422
423    a = tr_bitfieldNew( sz );
424    a_size = 0;
425   
426    while( a_size < k )
427    {
428        int i;
429        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
430        {
431            uint32_t j = i * 4;                                /* (5) */
432            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
433            uint32_t index = y % sz;                           /* (7) */
434            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
435                tr_bitfieldAdd( a, index );                    /* (9) */
436                ++a_size;
437            }
438        }
439        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
440    }
441   
442    return a;
443}
444
445tr_peerMgr*
446tr_peerMgrNew( tr_handle * handle )
447{
448    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
449    m->handle = handle;
450    m->torrents = tr_ptrArrayNew( );
451    m->incomingHandshakes = tr_ptrArrayNew( );
452    return m;
453}
454
455void
456tr_peerMgrFree( tr_peerMgr * manager )
457{
458    managerLock( manager );
459
460    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
461     * the item from manager->handshakes, so this is a little roundabout... */
462    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
463        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
464    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
465
466    /* free the torrents. */
467    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
468
469    managerUnlock( manager );
470    tr_free( manager );
471}
472
473static tr_peer**
474getConnectedPeers( Torrent * t, int * setmeCount )
475{
476    int i, peerCount, connectionCount;
477    tr_peer **peers;
478    tr_peer **ret;
479
480    assert( torrentIsLocked( t ) );
481
482    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
483    ret = tr_new( tr_peer*, peerCount );
484
485    for( i=connectionCount=0; i<peerCount; ++i )
486        if( peers[i]->msgs != NULL )
487            ret[connectionCount++] = peers[i];
488
489    *setmeCount = connectionCount;
490    return ret;
491}
492
493static int
494clientIsDownloadingFrom( const tr_peer * peer )
495{
496    return peer->clientIsInterested && !peer->clientIsChoked;
497}
498
499static int
500clientIsUploadingTo( const tr_peer * peer )
501{
502    return peer->peerIsInterested && !peer->peerIsChoked;
503}
504
505/***
506****
507***/
508
509int
510tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
511                      const uint8_t          * torrentHash,
512                      const struct in_addr   * addr )
513{
514    int isSeed = FALSE;
515    const Torrent * t = NULL;
516    const struct peer_atom * atom = NULL;
517
518    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
519    if( t )
520        atom = getExistingAtom( t, addr );
521    if( atom )
522        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
523
524    return isSeed;
525}
526
527/***
528****  Refill
529***/
530
531struct tr_refill_piece
532{
533    tr_priority_t priority;
534    int missingBlockCount;
535    uint16_t random;
536    uint32_t piece;
537    uint32_t peerCount;
538};
539
540static int
541compareRefillPiece (const void * aIn, const void * bIn)
542{
543    const struct tr_refill_piece * a = aIn;
544    const struct tr_refill_piece * b = bIn;
545
546    /* if one piece has a higher priority, it goes first */
547    if( a->priority != b->priority )
548        return a->priority > b->priority ? -1 : 1;
549   
550    /* otherwise if one has fewer peers, it goes first */
551    if (a->peerCount != b->peerCount)
552        return a->peerCount < b->peerCount ? -1 : 1;
553
554    /* otherwise go with our random seed */
555    return tr_compareUint16( a->random, b->random );
556}
557
558static int
559isPieceInteresting( const tr_torrent  * tor,
560                    tr_piece_index_t    piece )
561{
562    if( tor->info.pieces[piece].dnd ) /* we don't want it */
563        return 0;
564
565    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
566        return 0;
567
568    return 1;
569}
570
571static uint32_t*
572getPreferredPieces( Torrent     * t,
573                    uint32_t    * pieceCount )
574{
575    const tr_torrent * tor = t->tor;
576    const tr_info * inf = &tor->info;
577    tr_piece_index_t i;
578    uint32_t poolSize = 0;
579    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
580    int peerCount;
581    tr_peer** peers;
582
583    assert( torrentIsLocked( t ) );
584
585    peers = getConnectedPeers( t, &peerCount );
586
587    for( i=0; i<inf->pieceCount; ++i )
588        if( isPieceInteresting( tor, i ) )
589            pool[poolSize++] = i;
590
591    /* sort the pool from most interesting to least... */
592    if( poolSize > 1 )
593    {
594        uint32_t j;
595        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
596
597        for( j=0; j<poolSize; ++j )
598        {
599            int k;
600            const int piece = pool[j];
601            struct tr_refill_piece * setme = p + j;
602
603            setme->piece = piece;
604            setme->priority = inf->pieces[piece].priority;
605            setme->peerCount = 0;
606            setme->random = tr_rand( UINT16_MAX );
607            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
608
609            for( k=0; k<peerCount; ++k ) {
610                const tr_peer * peer = peers[k];
611                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
612                    ++setme->peerCount;
613            }
614        }
615
616        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
617
618        for( j=0; j<poolSize; ++j )
619            pool[j] = p[j].piece;
620
621        tr_free( p );
622    }
623
624    tr_free( peers );
625
626    *pieceCount = poolSize;
627    return pool;
628}
629
630static tr_peer**
631getPeersUploadingToClient( Torrent * t, int * setmeCount )
632{
633    int i;
634    int peerCount = 0;
635    int retCount = 0;
636    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
637    tr_peer ** ret = tr_new( tr_peer*, peerCount );
638
639    /* get a list of peers we're downloading from */
640    for( i=0; i<peerCount; ++i )
641        if( clientIsDownloadingFrom( peers[i] ) )
642            ret[retCount++] = peers[i];
643
644    /* pick a different starting point each time so all peers
645     * get a chance at the first blocks in the queue */
646    if( retCount ) {
647        tr_peer ** tmp = tr_new( tr_peer*, retCount );
648        i = tr_rand( retCount );
649        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
650        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
651        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
652        tr_free( tmp );
653    }
654
655    *setmeCount = retCount;
656    return ret;
657}
658
659static int
660refillPulse( void * vtorrent )
661{
662    Torrent * t = vtorrent;
663    tr_torrent * tor = t->tor;
664    int peerCount;
665    int webseedCount;
666    tr_peer ** peers;
667    tr_webseed ** webseeds;
668    uint32_t pieceCount;
669    uint32_t * pieces;
670    tr_piece_index_t i;
671
672    if( !t->isRunning )
673        return TRUE;
674    if( tr_torrentIsSeed( t->tor ) )
675        return TRUE;
676
677    torrentLock( t );
678    tordbg( t, "Refilling Request Buffers..." );
679
680    pieces = getPreferredPieces( t, &pieceCount );
681    peers = getPeersUploadingToClient( t, &peerCount );
682    webseedCount = tr_ptrArraySize( t->webseeds );
683    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
684
685    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
686    {
687        int j;
688        int handled = FALSE;
689        const tr_piece_index_t piece = pieces[i];
690
691        assert( piece < tor->info.pieceSize );
692
693        /* find a peer who can ask for this piece */
694        for( j=0; !handled && j<peerCount; )
695        {
696            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
697            switch( val )
698            {
699                case TR_ADDREQ_FULL: 
700                case TR_ADDREQ_CLIENT_CHOKED:
701                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
702                    break;
703                case TR_ADDREQ_MISSING: 
704                case TR_ADDREQ_DUPLICATE: 
705                    ++j;
706                    break;
707                case TR_ADDREQ_OK:
708                    tr_bitfieldAdd( t->requestedPieces, piece );
709                    handled = TRUE;
710                    break;
711                default:
712                    assert( 0 && "unhandled value" );
713                    break;
714            }
715        }
716
717        /* maybe one of the webseeds can do it */
718        for( j=0; !handled && j<webseedCount; )
719        {
720            const int val = tr_webseedAddRequest( webseeds[j], piece );
721            switch( val )
722            {
723                case TR_ADDREQ_FULL: 
724                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
725                    break;
726                case TR_ADDREQ_OK:
727                    tr_bitfieldAdd( t->requestedPieces, piece );
728                    handled = TRUE;
729                    break;
730                default:
731                    assert( 0 && "unhandled value" );
732                    break;
733            }
734        }
735    }
736
737    /* cleanup */
738    tr_free( webseeds );
739    tr_free( peers );
740    tr_free( pieces );
741
742    t->refillTimer = NULL;
743    torrentUnlock( t );
744    return FALSE;
745}
746
747static void
748addStrike( Torrent * t, tr_peer * peer )
749{
750    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
751
752    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
753    {
754        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
755        atom->myflags |= MYFLAG_BANNED;
756        peer->doPurge = 1;
757        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
758    }
759}
760
761static void
762gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
763{
764    tr_torrent * tor = t->tor;
765    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
766    tor->corruptCur += byteCount;
767    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
768}
769
770static void
771peerCallbackFunc( void * vpeer, void * vevent, void * vt )
772{
773    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
774    Torrent * t = (Torrent *) vt;
775    const tr_peer_event * e = vevent;
776
777    torrentLock( t );
778
779    switch( e->eventType )
780    {
781        case TR_PEER_NEED_REQ:
782            if( t->refillTimer == NULL )
783                t->refillTimer = tr_timerNew( t->manager->handle,
784                                              refillPulse, t,
785                                              REFILL_PERIOD_MSEC );
786            break;
787
788        case TR_PEER_CANCEL:
789            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
790            break;
791
792        case TR_PEER_PEER_GOT_DATA: {
793            const time_t now = time( NULL );
794            tr_torrent * tor = t->tor;
795            tor->activityDate = now;
796            tor->uploadedCur += e->length;
797            tr_rcTransferred( tor->upload, e->length );
798            tr_rcTransferred( tor->handle->upload, e->length );
799            tr_statsAddUploaded( tor->handle, e->length );
800            if( peer ) {
801                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
802                atom->piece_data_time = time( NULL );
803            }
804            break;
805        }
806
807        case TR_PEER_CLIENT_GOT_DATA: {
808            const time_t now = time( NULL );
809            tr_torrent * tor = t->tor;
810            tor->activityDate = now;
811            /* only add this to downloadedCur if we got it from a peer --
812             * webseeds shouldn't count against our ratio.  As one tracker
813             * admin put it, "Those pieces are downloaded directly from the
814             * content distributor, not the peers, it is the tracker's job
815             * to manage the swarms, not the web server and does not fit
816             * into the jurisdiction of the tracker." */
817            if( peer )
818                tor->downloadedCur += e->length;
819            tr_rcTransferred( tor->download, e->length );
820            tr_rcTransferred( tor->handle->download, e->length );
821            tr_statsAddDownloaded( tor->handle, e->length );
822            if( peer ) {
823                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
824                atom->piece_data_time = time( NULL );
825            }
826            break;
827        }
828
829        case TR_PEER_PEER_PROGRESS: {
830            if( peer ) {
831                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
832                const int peerIsSeed = e->progress >= 1.0;
833                if( peerIsSeed ) {
834                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
835                    atom->flags |= ADDED_F_SEED_FLAG;
836                } else {
837                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
838                    atom->flags &= ~ADDED_F_SEED_FLAG;
839                }
840            }
841            break;
842        }
843
844        case TR_PEER_CLIENT_GOT_BLOCK:
845        {
846            tr_torrent * tor = t->tor;
847
848            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
849
850            tr_cpBlockAdd( tor->completion, block );
851
852            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
853            {
854                const tr_piece_index_t p = e->pieceIndex;
855                const tr_errno err = tr_ioTestPiece( tor, p );
856
857                if( err ) {
858                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
859                               (unsigned long)p, tr_errorString( err ) );
860                }
861
862                tr_torrentSetHasPiece( tor, p, !err );
863                tr_torrentSetPieceChecked( tor, p, TRUE );
864                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
865
866                if( err )
867                    gotBadPiece( t, p );
868
869                tr_torrentRecheckCompleteness( tor );
870            }
871            break;
872        }
873
874        case TR_PEER_ERROR:
875            if( TR_ERROR_IS_IO( e->err ) ) {
876                t->tor->error = e->err;
877                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
878                tr_torrentStop( t->tor );
879            } else if( e->err == TR_ERROR_ASSERT ) {
880                addStrike( t, peer );
881            }
882            peer->doPurge = 1;
883            break;
884
885        default:
886            assert(0);
887    }
888
889    torrentUnlock( t );
890}
891
892static void
893ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
894{
895    if( getExistingAtom( t, addr ) == NULL )
896    {
897        struct peer_atom * a;
898        a = tr_new0( struct peer_atom, 1 );
899        a->addr = *addr;
900        a->port = port;
901        a->flags = flags;
902        a->from = from;
903        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
904        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
905    }
906}
907
908static int
909getMaxPeerCount( const tr_torrent * tor UNUSED )
910{
911    return tor->maxConnectedPeers;
912}
913
914/* FIXME: this is kind of a mess. */
915static void
916myHandshakeDoneCB( tr_handshake    * handshake,
917                   tr_peerIo       * io,
918                   int               isConnected,
919                   const uint8_t   * peer_id,
920                   void            * vmanager )
921{
922    int ok = isConnected;
923    uint16_t port;
924    const struct in_addr * addr;
925    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
926    Torrent * t;
927    tr_handshake * ours;
928
929    assert( io != NULL );
930    assert( isConnected==0 || isConnected==1 );
931
932    t = tr_peerIoHasTorrentHash( io )
933        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
934        : NULL;
935
936    if( tr_peerIoIsIncoming ( io ) )
937        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
938                                        handshake, handshakeCompare );
939    else if( t != NULL )
940        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
941                                        handshake, handshakeCompare );
942    else
943        ours = handshake;
944
945    assert( ours != NULL );
946    assert( ours == handshake );
947
948    if( t != NULL )
949        torrentLock( t );
950
951    addr = tr_peerIoGetAddress( io, &port );
952
953    if( !ok || !t || !t->isRunning )
954    {
955        if( t ) {
956            struct peer_atom * atom = getExistingAtom( t, addr );
957            if( atom )
958                ++atom->numFails;
959        }
960
961        tr_peerIoFree( io );
962    }
963    else /* looking good */
964    {
965        struct peer_atom * atom;
966        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
967        atom = getExistingAtom( t, addr );
968
969        if( atom->myflags & MYFLAG_BANNED )
970        {
971            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
972            tr_peerIoFree( io );
973        }
974        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
975        {
976            tr_peerIoFree( io );
977        }
978        else
979        {
980            tr_peer * peer = getExistingPeer( t, addr );
981
982            if( peer != NULL ) /* we already have this peer */
983            {
984                tr_peerIoFree( io );
985            }
986            else
987            {
988                peer = getPeer( t, addr );
989                tr_free( peer->client );
990
991                if( !peer_id )
992                    peer->client = NULL;
993                else {
994                    char client[128];
995                    tr_clientForId( client, sizeof( client ), peer_id );
996                    peer->client = tr_strdup( client );
997                }
998                peer->port = port;
999                peer->io = io;
1000                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1001                atom->time = time( NULL );
1002            }
1003        }
1004    }
1005
1006    if( t != NULL )
1007        torrentUnlock( t );
1008}
1009
1010void
1011tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1012                       struct in_addr  * addr,
1013                       uint16_t          port,
1014                       int               socket )
1015{
1016    managerLock( manager );
1017
1018    if( tr_blocklistHasAddress( manager->handle, addr ) )
1019    {
1020        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1021                inet_ntoa( *addr ) );
1022        tr_netClose( socket );
1023    }
1024    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1025    {
1026        tr_netClose( socket );
1027    }
1028    else /* we don't have a connetion to them yet... */
1029    {
1030        tr_peerIo * io;
1031        tr_handshake * handshake;
1032
1033        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1034
1035        handshake = tr_handshakeNew( io,
1036                                     manager->handle->encryptionMode,
1037                                     myHandshakeDoneCB,
1038                                     manager );
1039
1040        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1041    }
1042
1043    managerUnlock( manager );
1044}
1045
1046void
1047tr_peerMgrAddPex( tr_peerMgr     * manager,
1048                  const uint8_t  * torrentHash,
1049                  uint8_t          from,
1050                  const tr_pex   * pex )
1051{
1052    Torrent * t;
1053    managerLock( manager );
1054
1055    t = getExistingTorrent( manager, torrentHash );
1056    if( !tr_blocklistHasAddress( t->manager->handle, &pex->in_addr ) )
1057        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1058
1059    managerUnlock( manager );
1060}
1061
1062tr_pex *
1063tr_peerMgrCompactToPex( const void  * compact,
1064                        size_t        compactLen,
1065                        const char  * added_f,
1066                        size_t      * pexCount )
1067{
1068    size_t i;
1069    size_t n = compactLen / 6;
1070    const uint8_t * walk = compact;
1071    tr_pex * pex = tr_new0( tr_pex, n );
1072    for( i=0; i<n; ++i ) {
1073        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1074        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1075        if( added_f )
1076            pex[i].flags = added_f[i];
1077    }
1078    *pexCount = n;
1079    return pex;
1080}
1081
1082/**
1083***
1084**/
1085
1086void
1087tr_peerMgrSetBlame( tr_peerMgr     * manager,
1088                    const uint8_t  * torrentHash,
1089                    int              pieceIndex,
1090                    int              success )
1091{
1092    if( !success )
1093    {
1094        int peerCount, i;
1095        Torrent * t = getExistingTorrent( manager, torrentHash );
1096        tr_peer ** peers;
1097
1098        assert( torrentIsLocked( t ) );
1099
1100        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1101        for( i=0; i<peerCount; ++i )
1102        {
1103            tr_peer * peer = peers[i];
1104            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1105            {
1106                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1107                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1108                           pieceIndex, (int)peer->strikes+1 );
1109                addStrike( t, peer );
1110            }
1111        }
1112    }
1113}
1114
1115int
1116tr_pexCompare( const void * va, const void * vb )
1117{
1118    const tr_pex * a = (const tr_pex *) va;
1119    const tr_pex * b = (const tr_pex *) vb;
1120    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1121    if( i ) return i;
1122    if( a->port < b->port ) return -1;
1123    if( a->port > b->port ) return 1;
1124    return 0;
1125}
1126
1127int tr_pexCompare( const void * a, const void * b );
1128
1129static int
1130peerPrefersCrypto( const tr_peer * peer )
1131{
1132    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1133        return TRUE;
1134
1135    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1136        return FALSE;
1137
1138    return tr_peerIoIsEncrypted( peer->io );
1139};
1140
1141int
1142tr_peerMgrGetPeers( tr_peerMgr      * manager,
1143                    const uint8_t   * torrentHash,
1144                    tr_pex         ** setme_pex )
1145{
1146    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1147    int i, peerCount;
1148    const tr_peer ** peers;
1149    tr_pex * pex;
1150    tr_pex * walk;
1151
1152    managerLock( manager );
1153
1154    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1155    pex = walk = tr_new( tr_pex, peerCount );
1156
1157    for( i=0; i<peerCount; ++i, ++walk )
1158    {
1159        const tr_peer * peer = peers[i];
1160
1161        walk->in_addr = peer->in_addr;
1162
1163        walk->port = peer->port;
1164
1165        walk->flags = 0;
1166        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1167        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1168    }
1169
1170    assert( ( walk - pex ) == peerCount );
1171    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1172    *setme_pex = pex;
1173
1174    managerUnlock( manager );
1175
1176    return peerCount;
1177}
1178
1179static int reconnectPulse( void * vtorrent );
1180static int rechokePulse( void * vtorrent );
1181
1182void
1183tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1184                        const uint8_t  * torrentHash )
1185{
1186    Torrent * t;
1187
1188    managerLock( manager );
1189
1190    t = getExistingTorrent( manager, torrentHash );
1191
1192    assert( t != NULL );
1193    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1194    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1195
1196    if( !t->isRunning )
1197    {
1198        t->isRunning = 1;
1199
1200        t->reconnectTimer = tr_timerNew( t->manager->handle,
1201                                         reconnectPulse, t,
1202                                         RECONNECT_PERIOD_MSEC );
1203
1204        t->rechokeTimer = tr_timerNew( t->manager->handle,
1205                                       rechokePulse, t,
1206                                       RECHOKE_PERIOD_MSEC );
1207
1208        reconnectPulse( t );
1209
1210        rechokePulse( t );
1211    }
1212
1213    managerUnlock( manager );
1214}
1215
1216static void
1217stopTorrent( Torrent * t )
1218{
1219    assert( torrentIsLocked( t ) );
1220
1221    t->isRunning = 0;
1222    tr_timerFree( &t->rechokeTimer );
1223    tr_timerFree( &t->reconnectTimer );
1224
1225    /* disconnect the peers. */
1226    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1227    tr_ptrArrayClear( t->peers );
1228
1229    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1230     * which removes the handshake from t->outgoingHandshakes... */
1231    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1232        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1233}
1234void
1235tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1236                       const uint8_t  * torrentHash)
1237{
1238    managerLock( manager );
1239
1240    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1241
1242    managerUnlock( manager );
1243}
1244
1245void
1246tr_peerMgrAddTorrent( tr_peerMgr * manager,
1247                      tr_torrent * tor )
1248{
1249    Torrent * t;
1250
1251    managerLock( manager );
1252
1253    assert( tor != NULL );
1254    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1255
1256    t = torrentConstructor( manager, tor );
1257    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1258
1259    managerUnlock( manager );
1260}
1261
1262void
1263tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1264                         const uint8_t  * torrentHash )
1265{
1266    Torrent * t;
1267
1268    managerLock( manager );
1269
1270    t = getExistingTorrent( manager, torrentHash );
1271    assert( t != NULL );
1272    stopTorrent( t );
1273    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1274    torrentDestructor( t );
1275
1276    managerUnlock( manager );
1277}
1278
1279void
1280tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1281                               const uint8_t    * torrentHash,
1282                               int8_t           * tab,
1283                               int                tabCount )
1284{
1285    int i;
1286    const Torrent * t;
1287    const tr_torrent * tor;
1288    float interval;
1289    int isComplete;
1290    int peerCount;
1291    const tr_peer ** peers;
1292
1293    managerLock( (tr_peerMgr*)manager );
1294
1295    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1296    tor = t->tor;
1297    interval = tor->info.pieceCount / (float)tabCount;
1298    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1299    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1300
1301    memset( tab, 0, tabCount );
1302
1303    for( i=0; i<tabCount; ++i )
1304    {
1305        const int piece = i * interval;
1306
1307        if( tor == NULL )
1308            tab[i] = 0;
1309        else if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1310            tab[i] = -1;
1311        else if( peerCount ) {
1312            int j;
1313            for( j=0; j<peerCount; ++j )
1314                if( tr_bitfieldHas( peers[j]->have, i ) )
1315                    ++tab[i];
1316        }
1317    }
1318
1319    managerUnlock( (tr_peerMgr*)manager );
1320}
1321
1322/* Returns the pieces that are available from peers */
1323tr_bitfield*
1324tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1325                        const uint8_t    * torrentHash )
1326{
1327    int i, size;
1328    Torrent * t;
1329    tr_peer ** peers;
1330    tr_bitfield * pieces;
1331    managerLock( (tr_peerMgr*)manager );
1332
1333    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1334    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1335    peers = getConnectedPeers( t, &size );
1336    for( i=0; i<size; ++i )
1337        tr_bitfieldOr( pieces, peers[i]->have );
1338
1339    managerUnlock( (tr_peerMgr*)manager );
1340    tr_free( peers );
1341    return pieces;
1342}
1343
1344int
1345tr_peerMgrHasConnections( const tr_peerMgr * manager,
1346                          const uint8_t    * torrentHash )
1347{
1348    int ret;
1349    const Torrent * t;
1350    managerLock( (tr_peerMgr*)manager );
1351
1352    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1353    ret = t && tr_ptrArraySize( t->peers );
1354
1355    managerUnlock( (tr_peerMgr*)manager );
1356    return ret;
1357}
1358
1359void
1360tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1361                        const uint8_t    * torrentHash,
1362                        int              * setmePeersKnown,
1363                        int              * setmePeersConnected,
1364                        int              * setmeSeedsConnected,
1365                        int              * setmePeersSendingToUs,
1366                        int              * setmePeersGettingFromUs,
1367                        int              * setmePeersFrom )
1368{
1369    int i, size;
1370    const Torrent * t;
1371    const tr_peer ** peers;
1372
1373    managerLock( (tr_peerMgr*)manager );
1374
1375    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1376    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1377
1378    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1379    *setmePeersConnected      = 0;
1380    *setmeSeedsConnected      = 0;
1381    *setmePeersSendingToUs    = 0;
1382    *setmePeersGettingFromUs  = 0;
1383
1384    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1385        setmePeersFrom[i] = 0;
1386
1387    for( i=0; i<size; ++i )
1388    {
1389        const tr_peer * peer = peers[i];
1390        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1391
1392        if( peer->io == NULL ) /* not connected */
1393            continue;
1394
1395        ++*setmePeersConnected;
1396
1397        ++setmePeersFrom[atom->from];
1398
1399        if( clientIsDownloadingFrom( peer ) )
1400            ++*setmePeersSendingToUs;
1401
1402        if( clientIsUploadingTo( peer ) )
1403            ++*setmePeersGettingFromUs;
1404
1405        if( atom->flags & ADDED_F_SEED_FLAG )
1406            ++*setmeSeedsConnected;
1407    }
1408
1409    managerUnlock( (tr_peerMgr*)manager );
1410}
1411
1412struct tr_peer_stat *
1413tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1414                     const uint8_t     * torrentHash,
1415                     int               * setmeCount UNUSED )
1416{
1417    int i, size;
1418    const Torrent * t;
1419    tr_peer ** peers;
1420    tr_peer_stat * ret;
1421
1422    assert( manager != NULL );
1423    managerLock( (tr_peerMgr*)manager );
1424
1425    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1426    peers = getConnectedPeers( (Torrent*)t, &size );
1427    ret = tr_new0( tr_peer_stat, size );
1428
1429    for( i=0; i<size; ++i )
1430    {
1431        char * pch;
1432        const tr_peer * peer = peers[i];
1433        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1434        tr_peer_stat * stat = ret + i;
1435
1436        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1437        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1438        stat->port               = peer->port;
1439        stat->from               = atom->from;
1440        stat->progress           = peer->progress;
1441        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1442        stat->uploadToRate       = peer->rateToPeer;
1443        stat->downloadFromRate   = peer->rateToClient;
1444        stat->peerIsChoked       = peer->peerIsChoked;
1445        stat->peerIsInterested   = peer->peerIsInterested;
1446        stat->clientIsChoked     = peer->clientIsChoked;
1447        stat->clientIsInterested = peer->clientIsInterested;
1448        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1449        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1450        stat->isUploadingTo      = clientIsUploadingTo( peer );
1451
1452        pch = stat->flagStr;
1453        if( t->optimistic == peer ) *pch++ = 'O';
1454        if( stat->isDownloadingFrom ) *pch++ = 'D';
1455        else if( stat->clientIsInterested ) *pch++ = 'd';
1456        if( stat->isUploadingTo ) *pch++ = 'U';
1457        else if( stat->peerIsInterested ) *pch++ = 'u';
1458        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1459        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1460        if( stat->isEncrypted ) *pch++ = 'E';
1461        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1462        if( stat->isIncoming ) *pch++ = 'I';
1463        *pch = '\0';
1464    }
1465
1466    *setmeCount = size;
1467    tr_free( peers );
1468
1469    managerUnlock( (tr_peerMgr*)manager );
1470    return ret;
1471}
1472
1473/**
1474***
1475**/
1476
1477struct ChokeData
1478{
1479    uint8_t doUnchoke;
1480    uint8_t isInterested;
1481    uint32_t rate;
1482    tr_peer * peer;
1483};
1484
1485static int
1486compareChoke( const void * va, const void * vb )
1487{
1488    const struct ChokeData * a = va;
1489    const struct ChokeData * b = vb;
1490    return -tr_compareUint32( a->rate, b->rate );
1491}
1492
1493static int
1494isNew( const tr_peer * peer )
1495{
1496    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1497}
1498
1499static int
1500isSame( const tr_peer * peer )
1501{
1502    return peer && peer->client && strstr( peer->client, "Transmission" );
1503}
1504
1505/**
1506***
1507**/
1508
1509static int
1510getWeightedRate( const tr_peer * peer, int clientIsSeed )
1511{
1512    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1513                                        : peer->rateToClient ) );
1514}
1515
1516static void
1517rechoke( Torrent * t )
1518{
1519    int i, peerCount, size, unchokedInterested;
1520    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1521    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1522    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1523
1524    assert( torrentIsLocked( t ) );
1525   
1526    /* sort the peers by preference and rate */
1527    for( i=0, size=0; i<peerCount; ++i )
1528    {
1529        tr_peer * peer = peers[i];
1530        if( peer->progress >= 1.0 ) /* choke all seeds */
1531            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1532        else {
1533            struct ChokeData * node = &choke[size++];
1534            node->peer = peer;
1535            node->isInterested = peer->peerIsInterested;
1536            node->rate = getWeightedRate( peer, clientIsSeed );
1537        }
1538    }
1539
1540    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1541
1542    /**
1543     * Reciprocation and number of uploads capping is managed by unchoking
1544     * the N peers which have the best upload rate and are interested.
1545     * This maximizes the client's download rate. These N peers are
1546     * referred to as downloaders, because they are interested in downloading
1547     * from the client.
1548     *
1549     * Peers which have a better upload rate (as compared to the downloaders)
1550     * but aren't interested get unchoked. If they become interested, the
1551     * downloader with the worst upload rate gets choked. If a client has
1552     * a complete file, it uses its upload rate rather than its download
1553     * rate to decide which peers to unchoke.
1554     */
1555    unchokedInterested = 0;
1556    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1557        choke[i].doUnchoke = 1;
1558        if( choke[i].isInterested )
1559            ++unchokedInterested;
1560    }
1561
1562    /* optimistic unchoke */
1563    if( i < size )
1564    {
1565        int n;
1566        struct ChokeData * c;
1567        tr_ptrArray * randPool = tr_ptrArrayNew( );
1568
1569        for( ; i<size; ++i )
1570        {
1571            if( choke[i].isInterested )
1572            {
1573                const tr_peer * peer = choke[i].peer;
1574                int x=1, y;
1575                if( isNew( peer ) ) x *= 3;
1576                if( isSame( peer ) ) x *= 3;
1577                for( y=0; y<x; ++y )
1578                    tr_ptrArrayAppend( randPool, choke );
1579            }
1580        }
1581
1582        if(( n = tr_ptrArraySize( randPool )))
1583        {
1584            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1585            c->doUnchoke = 1;
1586            t->optimistic = c->peer;
1587        }
1588
1589        tr_ptrArrayFree( randPool, NULL );
1590    }
1591
1592    for( i=0; i<size; ++i )
1593        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1594
1595    /* cleanup */
1596    tr_free( choke );
1597    tr_free( peers );
1598}
1599
1600static int
1601rechokePulse( void * vtorrent )
1602{
1603    Torrent * t = vtorrent;
1604    torrentLock( t );
1605    rechoke( t );
1606    torrentUnlock( t );
1607    return TRUE;
1608}
1609
1610/***
1611****
1612****  Life and Death
1613****
1614***/
1615
1616static int
1617shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1618{
1619    const tr_torrent * tor = t->tor;
1620    const time_t now = time( NULL );
1621    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1622
1623    /* if it's marked for purging, close it */
1624    if( peer->doPurge ) {
1625        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1626        return TRUE;
1627    }
1628
1629    /* if we're seeding and the peer has everything we have,
1630     * and enough time has passed for a pex exchange, then disconnect */
1631    if( tr_torrentIsSeed( tor ) ) {
1632        int peerHasEverything;
1633        if( atom->flags & ADDED_F_SEED_FLAG )
1634            peerHasEverything = TRUE;
1635        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1636            peerHasEverything = FALSE;
1637        else {
1638            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1639            tr_bitfieldDifference( tmp, peer->have );
1640            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1641            tr_bitfieldFree( tmp );
1642        }
1643        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1644            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1645            return TRUE;
1646        }
1647    }
1648
1649    /* disconnect if it's been too long since piece data has been transferred.
1650     * this is on a sliding scale based on number of available peers... */
1651    {
1652        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1653        /* if we have >= relaxIfFewerThan, strictness is 100%.
1654         * if we have zero connections, strictness is 0% */
1655        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1656            ? 1.0
1657            : peerCount / (float)relaxStrictnessIfFewerThanN;
1658        const int lo = MIN_UPLOAD_IDLE_SECS;
1659        const int hi = MAX_UPLOAD_IDLE_SECS;
1660        const int limit = lo + ((hi-lo) * strictness);
1661        const time_t then = peer->pieceDataActivityDate;
1662        const int idleTime = then ? (now-then) : 0;
1663        if( idleTime > limit ) {
1664            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1665                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1666            return TRUE;
1667        }
1668    }
1669
1670    return FALSE;
1671}
1672
1673static tr_peer **
1674getPeersToClose( Torrent * t, int * setmeSize )
1675{
1676    int i, peerCount, outsize;
1677    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1678    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1679
1680    assert( torrentIsLocked( t ) );
1681
1682    for( i=outsize=0; i<peerCount; ++i )
1683        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1684            ret[outsize++] = peers[i];
1685
1686    *setmeSize = outsize;
1687    return ret;
1688}
1689
1690static int
1691compareCandidates( const void * va, const void * vb )
1692{
1693    const struct peer_atom * a = * (const struct peer_atom**) va;
1694    const struct peer_atom * b = * (const struct peer_atom**) vb;
1695    int i;
1696
1697    if( a->piece_data_time > b->piece_data_time ) return -1;
1698    if( a->piece_data_time < b->piece_data_time ) return  1;
1699
1700    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1701        return i;
1702
1703    if( a->time != b->time )
1704        return a->time < b->time ? -1 : 1;
1705
1706    return 0;
1707}
1708
1709static struct peer_atom **
1710getPeerCandidates( Torrent * t, int * setmeSize )
1711{
1712    int i, atomCount, retCount;
1713    struct peer_atom ** atoms;
1714    struct peer_atom ** ret;
1715    const time_t now = time( NULL );
1716    const int seed = tr_torrentIsSeed( t->tor );
1717
1718    assert( torrentIsLocked( t ) );
1719
1720    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1721    ret = tr_new( struct peer_atom*, atomCount );
1722    for( i=retCount=0; i<atomCount; ++i )
1723    {
1724        struct peer_atom * atom = atoms[i];
1725
1726        /* peer fed us too much bad data ... we only keep it around
1727         * now to weed it out in case someone sends it to us via pex */
1728        if( atom->myflags & MYFLAG_BANNED )
1729            continue;
1730
1731        /* peer was unconnectable before, so we're not going to keep trying.
1732         * this is needs a separate flag from `banned', since if they try
1733         * to connect to us later, we'll let them in */
1734        if( atom->myflags & MYFLAG_UNREACHABLE )
1735            continue;
1736
1737        /* we don't need two connections to the same peer... */
1738        if( peerIsInUse( t, &atom->addr ) )
1739            continue;
1740
1741        /* no need to connect if we're both seeds... */
1742        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1743            continue;
1744
1745        /* we're wasting our time trying to connect to this bozo. */
1746        if( atom->numFails > 3 )
1747            continue;
1748
1749        /* If we were connected to this peer recently and transferring
1750         * piece data, try to reconnect -- network troubles may have
1751         * disconnected us.  but if we weren't sharing piece data,
1752         * hold off on this peer to give another one a try instead */
1753        if( ( now - atom->piece_data_time ) > 30 )
1754        {
1755            int minWait = (60 * 10); /* ten minutes */
1756            int maxWait = (60 * 30); /* thirty minutes */
1757            int wait = atom->numFails * minWait;
1758            if( wait < minWait ) wait = minWait;
1759            if( wait > maxWait ) wait = maxWait;
1760            if( ( now - atom->time ) < wait ) {
1761                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1762                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1763                continue;
1764            }
1765        }
1766
1767        /* Don't connect to peers in our blocklist */
1768        if( tr_blocklistHasAddress( t->manager->handle, &atom->addr ) )
1769            continue;
1770
1771        ret[retCount++] = atom;
1772    }
1773
1774    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1775    *setmeSize = retCount;
1776    return ret;
1777}
1778
1779static int
1780reconnectPulse( void * vtorrent )
1781{
1782    Torrent * t = vtorrent;
1783    static time_t prevTime = 0;
1784    static int newConnectionsThisSecond = 0;
1785    time_t now;
1786
1787    torrentLock( t );
1788
1789    now = time( NULL );
1790    if( prevTime != now )
1791    {
1792        prevTime = now;
1793        newConnectionsThisSecond = 0;
1794    }
1795
1796    if( !t->isRunning )
1797    {
1798        removeAllPeers( t );
1799    }
1800    else
1801    {
1802        int i, nCandidates, nBad;
1803        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1804        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1805
1806        if( nBad || nCandidates )
1807            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1808                       "%d connection candidates, %d atoms, max per pulse is %d",
1809                       t->tor->info.name, nBad, nCandidates,
1810                       tr_ptrArraySize(t->pool),
1811                       (int)MAX_RECONNECTIONS_PER_PULSE );
1812
1813        /* disconnect some peers.
1814           if we got transferred piece data, then they might be good peers,
1815           so reset their `numFails' weight to zero.  otherwise we connected
1816           to them fruitlessly, so mark it as another fail */
1817        for( i=0; i<nBad; ++i ) {
1818            tr_peer * peer = connections[i];
1819            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1820            if( peer->pieceDataActivityDate )
1821                atom->numFails = 0;
1822            else
1823                ++atom->numFails;
1824            removePeer( t, peer );
1825        }
1826
1827        /* add some new ones */
1828        for( i=0;    i < nCandidates
1829                  && i < MAX_RECONNECTIONS_PER_PULSE
1830                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1831        {
1832            tr_peerMgr * mgr = t->manager;
1833            struct peer_atom * atom = candidates[i];
1834            tr_peerIo * io;
1835
1836            tordbg( t, "Starting an OUTGOING connection with %s",
1837                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1838
1839            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1840            if( io == NULL )
1841            {
1842                atom->myflags |= MYFLAG_UNREACHABLE;
1843            }
1844            else
1845            {
1846                tr_handshake * handshake = tr_handshakeNew( io,
1847                                                            mgr->handle->encryptionMode,
1848                                                            myHandshakeDoneCB,
1849                                                            mgr );
1850
1851                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1852
1853                ++newConnectionsThisSecond;
1854
1855                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1856            }
1857
1858            atom->time = time( NULL );
1859        }
1860
1861        /* cleanup */
1862        tr_free( connections );
1863        tr_free( candidates );
1864    }
1865
1866    torrentUnlock( t );
1867    return TRUE;
1868}
Note: See TracBrowser for help on using the repository browser.