source: trunk/libtransmission/peer-mgr.c @ 6406

Last change on this file since 6406 was 6406, checked in by charles, 13 years ago

(daemon) #1107: transmission-remote -t[n] -i should display webseeding info

  • Property svn:keywords set to Date Rev Author Id
File size: 54.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 6406 2008-07-27 14:10:32Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "blocklist.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "inout.h" /* tr_ioTestPiece */
28#include "net.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "stats.h" /* tr_statsAddDownloaded */
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39#include "webseed.h"
40
41enum
42{
43    /* how frequently to change which peers are choked */
44    RECHOKE_PERIOD_MSEC = (10 * 1000),
45
46    /* minimum interval for refilling peers' request lists */
47    REFILL_PERIOD_MSEC = 333,
48
49    /* when many peers are available, keep idle ones this long */
50    MIN_UPLOAD_IDLE_SECS = (60 * 3),
51
52    /* when few peers are available, keep idle ones this long */
53    MAX_UPLOAD_IDLE_SECS = (60 * 10),
54
55    /* how frequently to decide which peers live and die */
56    RECONNECT_PERIOD_MSEC = (2 * 1000),
57
58    /* max # of peers to ask fer per torrent per reconnect pulse */
59    MAX_RECONNECTIONS_PER_PULSE = 4,
60
61    /* max number of peers to ask for per second overall.
62     * this throttle is to avoid overloading the router */
63    MAX_CONNECTIONS_PER_SECOND = 8,
64
65    /* number of unchoked peers per torrent.
66     * FIXME: this probably ought to be configurable */
67    MAX_UNCHOKED_PEERS = 12,
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 3,
71
72    /* use for bitwise operations w/peer_atom.myflags */
73    MYFLAG_BANNED = 1,
74
75    /* unreachable for now... but not banned.  if they try to connect to us it's okay */
76    MYFLAG_UNREACHABLE = 2
77};
78
79
80/**
81***
82**/
83
84/* We keep one of these for every peer we know about, whether
85 * it's connected or not, so the struct must be small.
86 * When our current connections underperform, we dip back
87 * into this list for new ones. */
88struct peer_atom
89{   
90    uint8_t from;
91    uint8_t flags; /* these match the added_f flags */
92    uint8_t myflags; /* flags that aren't defined in added_f */
93    uint16_t port;
94    uint16_t numFails;
95    struct in_addr addr;
96    time_t time;
97    time_t piece_data_time;
98};
99
100typedef struct
101{
102    uint8_t hash[SHA_DIGEST_LENGTH];
103    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
104    tr_ptrArray * pool; /* struct peer_atom */
105    tr_ptrArray * peers; /* tr_peer */
106    tr_ptrArray * webseeds; /* tr_webseed */
107    tr_timer * reconnectTimer;
108    tr_timer * rechokeTimer;
109    tr_timer * refillTimer;
110    tr_torrent * tor;
111    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
112    tr_bitfield * requestedPieces;
113
114    unsigned int isRunning : 1;
115
116    struct tr_peerMgr * manager;
117}
118Torrent;
119
120struct tr_peerMgr
121{
122    tr_handle * handle;
123    tr_ptrArray * torrents; /* Torrent */
124    tr_ptrArray * incomingHandshakes; /* tr_handshake */
125};
126
127#define tordbg(t, fmt...) tr_deepLog( __FILE__, __LINE__, t->tor->info.name, ##fmt )
128
129/**
130***
131**/
132
133static void
134managerLock( const struct tr_peerMgr * manager )
135{
136    tr_globalLock( manager->handle );
137}
138static void
139managerUnlock( const struct tr_peerMgr * manager )
140{
141    tr_globalUnlock( manager->handle );
142}
143static void
144torrentLock( Torrent * torrent )
145{
146    managerLock( torrent->manager );
147}
148static void
149torrentUnlock( Torrent * torrent )
150{
151    managerUnlock( torrent->manager );
152}
153static int
154torrentIsLocked( const Torrent * t )
155{
156    return tr_globalIsLocked( t->manager->handle );
157}
158
159/**
160***
161**/
162
163static int
164compareAddresses( const struct in_addr * a, const struct in_addr * b )
165{
166    return tr_compareUint32( a->s_addr, b->s_addr );
167}
168
169static int
170handshakeCompareToAddr( const void * va, const void * vb )
171{
172    const tr_handshake * a = va;
173    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
174}
175
176static int
177handshakeCompare( const void * a, const void * b )
178{
179    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
180}
181
182static tr_handshake*
183getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
184{
185    return tr_ptrArrayFindSorted( handshakes,
186                                  in_addr,
187                                  handshakeCompareToAddr );
188}
189
190static int
191comparePeerAtomToAddress( const void * va, const void * vb )
192{
193    const struct peer_atom * a = va;
194    return compareAddresses( &a->addr, vb );
195}
196
197static int
198comparePeerAtoms( const void * va, const void * vb )
199{
200    const struct peer_atom * b = vb;
201    return comparePeerAtomToAddress( va, &b->addr );
202}
203
204/**
205***
206**/
207
208static int
209torrentCompare( const void * va, const void * vb )
210{
211    const Torrent * a = va;
212    const Torrent * b = vb;
213    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
214}
215
216static int
217torrentCompareToHash( const void * va, const void * vb )
218{
219    const Torrent * a = va;
220    const uint8_t * b_hash = vb;
221    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
222}
223
224static Torrent*
225getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
226{
227    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
228                                             hash,
229                                             torrentCompareToHash );
230}
231
232static int
233peerCompare( const void * va, const void * vb )
234{
235    const tr_peer * a = va;
236    const tr_peer * b = vb;
237    return compareAddresses( &a->in_addr, &b->in_addr );
238}
239
240static int
241peerCompareToAddr( const void * va, const void * vb )
242{
243    const tr_peer * a = va;
244    return compareAddresses( &a->in_addr, vb );
245}
246
247static tr_peer*
248getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
249{
250    assert( torrentIsLocked( torrent ) );
251    assert( in_addr != NULL );
252
253    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
254                                             in_addr,
255                                             peerCompareToAddr );
256}
257
258static struct peer_atom*
259getExistingAtom( const Torrent * t, const struct in_addr * addr )
260{
261    assert( torrentIsLocked( t ) );
262    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
263}
264
265static int
266peerIsInUse( const Torrent * ct, const struct in_addr * addr )
267{
268    Torrent * t = (Torrent*) ct;
269
270    assert( torrentIsLocked ( t ) );
271
272    return getExistingPeer( t, addr )
273        || getExistingHandshake( t->outgoingHandshakes, addr )
274        || getExistingHandshake( t->manager->incomingHandshakes, addr );
275}
276
277static tr_peer*
278peerConstructor( const struct in_addr * in_addr )
279{
280    tr_peer * p;
281    p = tr_new0( tr_peer, 1 );
282    p->rcToClient = tr_rcInit( );
283    p->rcToPeer = tr_rcInit( );
284    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
285    return p;
286}
287
288static tr_peer*
289getPeer( Torrent * torrent, const struct in_addr * in_addr )
290{
291    tr_peer * peer;
292
293    assert( torrentIsLocked( torrent ) );
294
295    peer = getExistingPeer( torrent, in_addr );
296
297    if( peer == NULL ) {
298        peer = peerConstructor( in_addr );
299        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
300    }
301
302    return peer;
303}
304
305static void
306peerDestructor( tr_peer * peer )
307{
308    assert( peer != NULL );
309    assert( peer->msgs != NULL );
310
311    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
312    tr_peerMsgsFree( peer->msgs );
313
314    tr_peerIoFree( peer->io );
315
316    tr_bitfieldFree( peer->have );
317    tr_bitfieldFree( peer->blame );
318    tr_rcClose( peer->rcToClient );
319    tr_rcClose( peer->rcToPeer );
320    tr_free( peer->client );
321    tr_free( peer );
322}
323
324static void
325removePeer( Torrent * t, tr_peer * peer )
326{
327    tr_peer * removed;
328    struct peer_atom * atom;
329
330    assert( torrentIsLocked( t ) );
331
332    atom = getExistingAtom( t, &peer->in_addr );
333    assert( atom != NULL );
334    atom->time = time( NULL );
335
336    removed = tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
337    assert( removed == peer );
338    peerDestructor( removed );
339}
340
341static void
342removeAllPeers( Torrent * t )
343{
344    while( !tr_ptrArrayEmpty( t->peers ) )
345        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
346}
347
348static void
349torrentDestructor( Torrent * t )
350{
351    uint8_t hash[SHA_DIGEST_LENGTH];
352
353    assert( t != NULL );
354    assert( !t->isRunning );
355    assert( t->peers != NULL );
356    assert( torrentIsLocked( t ) );
357    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
358    assert( tr_ptrArrayEmpty( t->peers ) );
359
360    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
361
362    tr_timerFree( &t->reconnectTimer );
363    tr_timerFree( &t->rechokeTimer );
364    tr_timerFree( &t->refillTimer );
365
366    tr_bitfieldFree( t->requestedPieces );
367    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
368    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
369    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
370    tr_ptrArrayFree( t->peers, NULL );
371
372    tr_free( t );
373}
374
375static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
376
377static Torrent*
378torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
379{
380    int i;
381    Torrent * t;
382
383    t = tr_new0( Torrent, 1 );
384    t->manager = manager;
385    t->tor = tor;
386    t->pool = tr_ptrArrayNew( );
387    t->peers = tr_ptrArrayNew( );
388    t->webseeds = tr_ptrArrayNew( );
389    t->outgoingHandshakes = tr_ptrArrayNew( );
390    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
391    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
392
393    for( i=0; i<tor->info.webseedCount; ++i ) {
394        tr_webseed * w = tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
395        tr_ptrArrayAppend( t->webseeds, w );
396    }
397
398    return t;
399}
400
401/**
402 * For explanation, see http://www.bittorrent.org/fast_extensions.html
403 * Also see the "test-allowed-set" unit test
404 */
405struct tr_bitfield *
406tr_peerMgrGenerateAllowedSet( const uint32_t         k,         /* number of pieces in set */
407                              const uint32_t         sz,        /* number of pieces in torrent */
408                              const uint8_t        * infohash,  /* torrent's SHA1 hash*/
409                              const struct in_addr * ip )       /* peer's address */
410{
411    uint8_t w[SHA_DIGEST_LENGTH + 4];
412    uint8_t x[SHA_DIGEST_LENGTH];
413    tr_bitfield_t * a;
414    uint32_t a_size;
415
416    *(uint32_t*)w = ntohl( htonl(ip->s_addr) & 0xffffff00 );   /* (1) */
417    memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );              /* (2) */
418    tr_sha1( x, w, sizeof( w ), NULL );                        /* (3) */
419
420    a = tr_bitfieldNew( sz );
421    a_size = 0;
422   
423    while( a_size < k )
424    {
425        int i;
426        for ( i=0; i<5 && a_size<k; ++i )                      /* (4) */
427        {
428            uint32_t j = i * 4;                                /* (5) */
429            uint32_t y = ntohl(*(uint32_t*)(x+j));             /* (6) */
430            uint32_t index = y % sz;                           /* (7) */
431            if ( !tr_bitfieldHas( a, index ) ) {               /* (8) */
432                tr_bitfieldAdd( a, index );                    /* (9) */
433                ++a_size;
434            }
435        }
436        tr_sha1( x, x, sizeof( x ), NULL );                    /* (3) */
437    }
438   
439    return a;
440}
441
442tr_peerMgr*
443tr_peerMgrNew( tr_handle * handle )
444{
445    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
446    m->handle = handle;
447    m->torrents = tr_ptrArrayNew( );
448    m->incomingHandshakes = tr_ptrArrayNew( );
449    return m;
450}
451
452void
453tr_peerMgrFree( tr_peerMgr * manager )
454{
455    managerLock( manager );
456
457    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
458     * the item from manager->handshakes, so this is a little roundabout... */
459    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
460        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
461    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
462
463    /* free the torrents. */
464    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
465
466    managerUnlock( manager );
467    tr_free( manager );
468}
469
470static tr_peer**
471getConnectedPeers( Torrent * t, int * setmeCount )
472{
473    int i, peerCount, connectionCount;
474    tr_peer **peers;
475    tr_peer **ret;
476
477    assert( torrentIsLocked( t ) );
478
479    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
480    ret = tr_new( tr_peer*, peerCount );
481
482    for( i=connectionCount=0; i<peerCount; ++i )
483        if( peers[i]->msgs != NULL )
484            ret[connectionCount++] = peers[i];
485
486    *setmeCount = connectionCount;
487    return ret;
488}
489
490static int
491clientIsDownloadingFrom( const tr_peer * peer )
492{
493    return peer->clientIsInterested && !peer->clientIsChoked;
494}
495
496static int
497clientIsUploadingTo( const tr_peer * peer )
498{
499    return peer->peerIsInterested && !peer->peerIsChoked;
500}
501
502/***
503****
504***/
505
506int
507tr_peerMgrPeerIsSeed( const tr_peerMgr       * mgr,
508                      const uint8_t          * torrentHash,
509                      const struct in_addr   * addr )
510{
511    int isSeed = FALSE;
512    const Torrent * t = NULL;
513    const struct peer_atom * atom = NULL;
514
515    t = getExistingTorrent( (tr_peerMgr*)mgr, torrentHash );
516    if( t )
517        atom = getExistingAtom( t, addr );
518    if( atom )
519        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
520
521    return isSeed;
522}
523
524/***
525****  Refill
526***/
527
528struct tr_refill_piece
529{
530    tr_priority_t priority;
531    int missingBlockCount;
532    uint16_t random;
533    uint32_t piece;
534    uint32_t peerCount;
535};
536
537static int
538compareRefillPiece (const void * aIn, const void * bIn)
539{
540    const struct tr_refill_piece * a = aIn;
541    const struct tr_refill_piece * b = bIn;
542
543    /* if one piece has a higher priority, it goes first */
544    if( a->priority != b->priority )
545        return a->priority > b->priority ? -1 : 1;
546   
547    /* otherwise if one has fewer peers, it goes first */
548    if (a->peerCount != b->peerCount)
549        return a->peerCount < b->peerCount ? -1 : 1;
550
551    /* otherwise go with our random seed */
552    return tr_compareUint16( a->random, b->random );
553}
554
555static int
556isPieceInteresting( const tr_torrent  * tor,
557                    tr_piece_index_t    piece )
558{
559    if( tor->info.pieces[piece].dnd ) /* we don't want it */
560        return 0;
561
562    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
563        return 0;
564
565    return 1;
566}
567
568static uint32_t*
569getPreferredPieces( Torrent     * t,
570                    uint32_t    * pieceCount )
571{
572    const tr_torrent * tor = t->tor;
573    const tr_info * inf = &tor->info;
574    tr_piece_index_t i;
575    uint32_t poolSize = 0;
576    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
577    int peerCount;
578    tr_peer** peers;
579
580    assert( torrentIsLocked( t ) );
581
582    peers = getConnectedPeers( t, &peerCount );
583
584    for( i=0; i<inf->pieceCount; ++i )
585        if( isPieceInteresting( tor, i ) )
586            pool[poolSize++] = i;
587
588    /* sort the pool from most interesting to least... */
589    if( poolSize > 1 )
590    {
591        uint32_t j;
592        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
593
594        for( j=0; j<poolSize; ++j )
595        {
596            int k;
597            const int piece = pool[j];
598            struct tr_refill_piece * setme = p + j;
599
600            setme->piece = piece;
601            setme->priority = inf->pieces[piece].priority;
602            setme->peerCount = 0;
603            setme->random = tr_rand( UINT16_MAX );
604            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
605
606            for( k=0; k<peerCount; ++k ) {
607                const tr_peer * peer = peers[k];
608                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
609                    ++setme->peerCount;
610            }
611        }
612
613        qsort( p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece );
614
615        for( j=0; j<poolSize; ++j )
616            pool[j] = p[j].piece;
617
618        tr_free( p );
619    }
620
621    tr_free( peers );
622
623    *pieceCount = poolSize;
624    return pool;
625}
626
627static tr_peer**
628getPeersUploadingToClient( Torrent * t, int * setmeCount )
629{
630    int i;
631    int peerCount = 0;
632    int retCount = 0;
633    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
634    tr_peer ** ret = tr_new( tr_peer*, peerCount );
635
636    /* get a list of peers we're downloading from */
637    for( i=0; i<peerCount; ++i )
638        if( clientIsDownloadingFrom( peers[i] ) )
639            ret[retCount++] = peers[i];
640
641    /* pick a different starting point each time so all peers
642     * get a chance at the first blocks in the queue */
643    if( retCount ) {
644        tr_peer ** tmp = tr_new( tr_peer*, retCount );
645        i = tr_rand( retCount );
646        memcpy( tmp, ret, sizeof(tr_peer*) * retCount );
647        memcpy( ret, tmp+i, sizeof(tr_peer*) * (retCount-i) );
648        memcpy( ret+(retCount-i), tmp, sizeof(tr_peer*) * i );
649        tr_free( tmp );
650    }
651
652    *setmeCount = retCount;
653    return ret;
654}
655
656static int
657refillPulse( void * vtorrent )
658{
659    Torrent * t = vtorrent;
660    tr_torrent * tor = t->tor;
661    int peerCount;
662    int webseedCount;
663    tr_peer ** peers;
664    tr_webseed ** webseeds;
665    uint32_t pieceCount;
666    uint32_t * pieces;
667    tr_piece_index_t i;
668
669    if( !t->isRunning )
670        return TRUE;
671    if( tr_torrentIsSeed( t->tor ) )
672        return TRUE;
673
674    torrentLock( t );
675    tordbg( t, "Refilling Request Buffers..." );
676
677    pieces = getPreferredPieces( t, &pieceCount );
678    peers = getPeersUploadingToClient( t, &peerCount );
679    webseedCount = tr_ptrArraySize( t->webseeds );
680    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
681
682    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
683    {
684        int j;
685        int handled = FALSE;
686        const tr_piece_index_t piece = pieces[i];
687
688        assert( piece < tor->info.pieceSize );
689
690        /* find a peer who can ask for this piece */
691        for( j=0; !handled && j<peerCount; )
692        {
693            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
694            switch( val )
695            {
696                case TR_ADDREQ_FULL: 
697                case TR_ADDREQ_CLIENT_CHOKED:
698                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
699                    break;
700                case TR_ADDREQ_MISSING: 
701                case TR_ADDREQ_DUPLICATE: 
702                    ++j;
703                    break;
704                case TR_ADDREQ_OK:
705                    tr_bitfieldAdd( t->requestedPieces, piece );
706                    handled = TRUE;
707                    break;
708                default:
709                    assert( 0 && "unhandled value" );
710                    break;
711            }
712        }
713
714        /* maybe one of the webseeds can do it */
715        for( j=0; !handled && j<webseedCount; )
716        {
717            const int val = tr_webseedAddRequest( webseeds[j], piece );
718            switch( val )
719            {
720                case TR_ADDREQ_FULL: 
721                    memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
722                    break;
723                case TR_ADDREQ_OK:
724                    tr_bitfieldAdd( t->requestedPieces, piece );
725                    handled = TRUE;
726                    break;
727                default:
728                    assert( 0 && "unhandled value" );
729                    break;
730            }
731        }
732    }
733
734    /* cleanup */
735    tr_free( webseeds );
736    tr_free( peers );
737    tr_free( pieces );
738
739    t->refillTimer = NULL;
740    torrentUnlock( t );
741    return FALSE;
742}
743
744static void
745addStrike( Torrent * t, tr_peer * peer )
746{
747    tordbg( t, "increasing peer %s strike count to %d", tr_peerIoAddrStr(&peer->in_addr,peer->port), peer->strikes+1 );
748
749    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
750    {
751        struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
752        atom->myflags |= MYFLAG_BANNED;
753        peer->doPurge = 1;
754        tordbg( t, "banning peer %s", tr_peerIoAddrStr(&atom->addr,atom->port) );
755    }
756}
757
758static void
759gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
760{
761    tr_torrent * tor = t->tor;
762    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
763    tor->corruptCur += byteCount;
764    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
765}
766
767static void
768refillSoon( Torrent * t )
769{
770    if( t->refillTimer == NULL )
771        t->refillTimer = tr_timerNew( t->manager->handle,
772                                      refillPulse, t,
773                                      REFILL_PERIOD_MSEC );
774}
775
776static void
777peerCallbackFunc( void * vpeer, void * vevent, void * vt )
778{
779    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
780    Torrent * t = (Torrent *) vt;
781    const tr_peer_event * e = vevent;
782
783    torrentLock( t );
784
785    switch( e->eventType )
786    {
787        case TR_PEER_NEED_REQ:
788            refillSoon( t );
789            break;
790
791        case TR_PEER_CANCEL:
792            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
793            break;
794
795        case TR_PEER_PEER_GOT_DATA: {
796            const time_t now = time( NULL );
797            tr_torrent * tor = t->tor;
798            tor->activityDate = now;
799            tor->uploadedCur += e->length;
800            tr_rcTransferred( tor->upload, e->length );
801            tr_rcTransferred( tor->handle->upload, e->length );
802            tr_statsAddUploaded( tor->handle, e->length );
803            if( peer ) {
804                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
805                atom->piece_data_time = time( NULL );
806            }
807            break;
808        }
809
810        case TR_PEER_CLIENT_GOT_DATA: {
811            const time_t now = time( NULL );
812            tr_torrent * tor = t->tor;
813            tor->activityDate = now;
814            /* only add this to downloadedCur if we got it from a peer --
815             * webseeds shouldn't count against our ratio.  As one tracker
816             * admin put it, "Those pieces are downloaded directly from the
817             * content distributor, not the peers, it is the tracker's job
818             * to manage the swarms, not the web server and does not fit
819             * into the jurisdiction of the tracker." */
820            if( peer )
821                tor->downloadedCur += e->length;
822            tr_rcTransferred( tor->download, e->length );
823            tr_rcTransferred( tor->handle->download, e->length );
824            tr_statsAddDownloaded( tor->handle, e->length );
825            if( peer ) {
826                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
827                atom->piece_data_time = time( NULL );
828            }
829            break;
830        }
831
832        case TR_PEER_PEER_PROGRESS: {
833            if( peer ) {
834                struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
835                const int peerIsSeed = e->progress >= 1.0;
836                if( peerIsSeed ) {
837                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
838                    atom->flags |= ADDED_F_SEED_FLAG;
839                } else {
840                    tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
841                    atom->flags &= ~ADDED_F_SEED_FLAG;
842                }
843            }
844            break;
845        }
846
847        case TR_PEER_CLIENT_GOT_BLOCK:
848        {
849            tr_torrent * tor = t->tor;
850
851            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
852
853            tr_cpBlockAdd( tor->completion, block );
854
855            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
856            {
857                const tr_piece_index_t p = e->pieceIndex;
858                const tr_errno err = tr_ioTestPiece( tor, p );
859
860                if( err ) {
861                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
862                               (unsigned long)p, tr_errorString( err ) );
863                }
864
865                tr_torrentSetHasPiece( tor, p, !err );
866                tr_torrentSetPieceChecked( tor, p, TRUE );
867                tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, p, !err );
868
869                if( err )
870                    gotBadPiece( t, p );
871                else {
872                    int i, peerCount;
873                    tr_peer ** peers = getConnectedPeers( t, &peerCount );
874                    for( i=0; i<peerCount; ++i )
875                        tr_peerMsgsHave( peers[i]->msgs, p );
876                    tr_free( peers );
877                }
878
879                tr_torrentRecheckCompleteness( tor );
880            }
881            break;
882        }
883
884        case TR_PEER_ERROR:
885            if( TR_ERROR_IS_IO( e->err ) ) {
886                t->tor->error = e->err;
887                tr_strlcpy( t->tor->errorString, tr_errorString( e->err ), sizeof(t->tor->errorString) );
888                tr_torrentStop( t->tor );
889            } else if( e->err == TR_ERROR_ASSERT ) {
890                addStrike( t, peer );
891            }
892            peer->doPurge = 1;
893            break;
894
895        default:
896            assert(0);
897    }
898
899    torrentUnlock( t );
900}
901
902static void
903ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
904{
905    if( getExistingAtom( t, addr ) == NULL )
906    {
907        struct peer_atom * a;
908        a = tr_new0( struct peer_atom, 1 );
909        a->addr = *addr;
910        a->port = port;
911        a->flags = flags;
912        a->from = from;
913        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
914        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
915    }
916}
917
918static int
919getMaxPeerCount( const tr_torrent * tor UNUSED )
920{
921    return tor->maxConnectedPeers;
922}
923
924/* FIXME: this is kind of a mess. */
925static void
926myHandshakeDoneCB( tr_handshake    * handshake,
927                   tr_peerIo       * io,
928                   int               isConnected,
929                   const uint8_t   * peer_id,
930                   void            * vmanager )
931{
932    int ok = isConnected;
933    uint16_t port;
934    const struct in_addr * addr;
935    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
936    Torrent * t;
937    tr_handshake * ours;
938
939    assert( io != NULL );
940    assert( isConnected==0 || isConnected==1 );
941
942    t = tr_peerIoHasTorrentHash( io )
943        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
944        : NULL;
945
946    if( tr_peerIoIsIncoming ( io ) )
947        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
948                                        handshake, handshakeCompare );
949    else if( t != NULL )
950        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
951                                        handshake, handshakeCompare );
952    else
953        ours = handshake;
954
955    assert( ours != NULL );
956    assert( ours == handshake );
957
958    if( t != NULL )
959        torrentLock( t );
960
961    addr = tr_peerIoGetAddress( io, &port );
962
963    if( !ok || !t || !t->isRunning )
964    {
965        if( t ) {
966            struct peer_atom * atom = getExistingAtom( t, addr );
967            if( atom )
968                ++atom->numFails;
969        }
970
971        tr_peerIoFree( io );
972    }
973    else /* looking good */
974    {
975        struct peer_atom * atom;
976        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
977        atom = getExistingAtom( t, addr );
978
979        if( atom->myflags & MYFLAG_BANNED )
980        {
981            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
982            tr_peerIoFree( io );
983        }
984        else if( tr_ptrArraySize( t->peers ) >= getMaxPeerCount( t->tor ) )
985        {
986            tr_peerIoFree( io );
987        }
988        else
989        {
990            tr_peer * peer = getExistingPeer( t, addr );
991
992            if( peer != NULL ) /* we already have this peer */
993            {
994                tr_peerIoFree( io );
995            }
996            else
997            {
998                peer = getPeer( t, addr );
999                tr_free( peer->client );
1000
1001                if( !peer_id )
1002                    peer->client = NULL;
1003                else {
1004                    char client[128];
1005                    tr_clientForId( client, sizeof( client ), peer_id );
1006                    peer->client = tr_strdup( client );
1007                }
1008                peer->port = port;
1009                peer->io = io;
1010                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1011                atom->time = time( NULL );
1012            }
1013        }
1014    }
1015
1016    if( t != NULL )
1017        torrentUnlock( t );
1018}
1019
1020void
1021tr_peerMgrAddIncoming( tr_peerMgr      * manager,
1022                       struct in_addr  * addr,
1023                       uint16_t          port,
1024                       int               socket )
1025{
1026    managerLock( manager );
1027
1028    if( tr_sessionIsAddressBlocked( manager->handle, addr ) )
1029    {
1030        tr_dbg( "Banned IP address \"%s\" tried to connect to us",
1031                inet_ntoa( *addr ) );
1032        tr_netClose( socket );
1033    }
1034    else if( getExistingHandshake( manager->incomingHandshakes, addr ) )
1035    {
1036        tr_netClose( socket );
1037    }
1038    else /* we don't have a connetion to them yet... */
1039    {
1040        tr_peerIo * io;
1041        tr_handshake * handshake;
1042
1043        io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
1044
1045        handshake = tr_handshakeNew( io,
1046                                     manager->handle->encryptionMode,
1047                                     myHandshakeDoneCB,
1048                                     manager );
1049
1050        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
1051    }
1052
1053    managerUnlock( manager );
1054}
1055
1056void
1057tr_peerMgrAddPex( tr_peerMgr     * manager,
1058                  const uint8_t  * torrentHash,
1059                  uint8_t          from,
1060                  const tr_pex   * pex )
1061{
1062    Torrent * t;
1063    managerLock( manager );
1064
1065    t = getExistingTorrent( manager, torrentHash );
1066    if( !tr_sessionIsAddressBlocked( t->manager->handle, &pex->in_addr ) )
1067        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1068
1069    managerUnlock( manager );
1070}
1071
1072tr_pex *
1073tr_peerMgrCompactToPex( const void  * compact,
1074                        size_t        compactLen,
1075                        const char  * added_f,
1076                        size_t      * pexCount )
1077{
1078    size_t i;
1079    size_t n = compactLen / 6;
1080    const uint8_t * walk = compact;
1081    tr_pex * pex = tr_new0( tr_pex, n );
1082    for( i=0; i<n; ++i ) {
1083        memcpy( &pex[i].in_addr, walk, 4 ); walk += 4;
1084        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1085        if( added_f )
1086            pex[i].flags = added_f[i];
1087    }
1088    *pexCount = n;
1089    return pex;
1090}
1091
1092/**
1093***
1094**/
1095
1096void
1097tr_peerMgrSetBlame( tr_peerMgr     * manager,
1098                    const uint8_t  * torrentHash,
1099                    int              pieceIndex,
1100                    int              success )
1101{
1102    if( !success )
1103    {
1104        int peerCount, i;
1105        Torrent * t = getExistingTorrent( manager, torrentHash );
1106        tr_peer ** peers;
1107
1108        assert( torrentIsLocked( t ) );
1109
1110        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1111        for( i=0; i<peerCount; ++i )
1112        {
1113            tr_peer * peer = peers[i];
1114            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1115            {
1116                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1117                           tr_peerIoAddrStr(&peer->in_addr,peer->port),
1118                           pieceIndex, (int)peer->strikes+1 );
1119                addStrike( t, peer );
1120            }
1121        }
1122    }
1123}
1124
1125int
1126tr_pexCompare( const void * va, const void * vb )
1127{
1128    const tr_pex * a = (const tr_pex *) va;
1129    const tr_pex * b = (const tr_pex *) vb;
1130    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1131    if( i ) return i;
1132    if( a->port < b->port ) return -1;
1133    if( a->port > b->port ) return 1;
1134    return 0;
1135}
1136
1137int tr_pexCompare( const void * a, const void * b );
1138
1139static int
1140peerPrefersCrypto( const tr_peer * peer )
1141{
1142    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1143        return TRUE;
1144
1145    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1146        return FALSE;
1147
1148    return tr_peerIoIsEncrypted( peer->io );
1149};
1150
1151int
1152tr_peerMgrGetPeers( tr_peerMgr      * manager,
1153                    const uint8_t   * torrentHash,
1154                    tr_pex         ** setme_pex )
1155{
1156    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1157    int i, peerCount;
1158    const tr_peer ** peers;
1159    tr_pex * pex;
1160    tr_pex * walk;
1161
1162    managerLock( manager );
1163
1164    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1165    pex = walk = tr_new( tr_pex, peerCount );
1166
1167    for( i=0; i<peerCount; ++i, ++walk )
1168    {
1169        const tr_peer * peer = peers[i];
1170
1171        walk->in_addr = peer->in_addr;
1172
1173        walk->port = peer->port;
1174
1175        walk->flags = 0;
1176        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1177        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1178    }
1179
1180    assert( ( walk - pex ) == peerCount );
1181    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1182    *setme_pex = pex;
1183
1184    managerUnlock( manager );
1185
1186    return peerCount;
1187}
1188
1189static int reconnectPulse( void * vtorrent );
1190static int rechokePulse( void * vtorrent );
1191
1192void
1193tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1194                        const uint8_t  * torrentHash )
1195{
1196    Torrent * t;
1197
1198    managerLock( manager );
1199
1200    t = getExistingTorrent( manager, torrentHash );
1201
1202    assert( t != NULL );
1203    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1204    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1205
1206    if( !t->isRunning )
1207    {
1208        t->isRunning = 1;
1209
1210        t->reconnectTimer = tr_timerNew( t->manager->handle,
1211                                         reconnectPulse, t,
1212                                         RECONNECT_PERIOD_MSEC );
1213
1214        t->rechokeTimer = tr_timerNew( t->manager->handle,
1215                                       rechokePulse, t,
1216                                       RECHOKE_PERIOD_MSEC );
1217
1218        reconnectPulse( t );
1219
1220        rechokePulse( t );
1221
1222        if( !tr_ptrArrayEmpty( t->webseeds ) )
1223            refillSoon( t );
1224    }
1225
1226    managerUnlock( manager );
1227}
1228
1229static void
1230stopTorrent( Torrent * t )
1231{
1232    assert( torrentIsLocked( t ) );
1233
1234    t->isRunning = 0;
1235    tr_timerFree( &t->rechokeTimer );
1236    tr_timerFree( &t->reconnectTimer );
1237
1238    /* disconnect the peers. */
1239    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1240    tr_ptrArrayClear( t->peers );
1241
1242    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1243     * which removes the handshake from t->outgoingHandshakes... */
1244    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1245        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1246}
1247void
1248tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1249                       const uint8_t  * torrentHash)
1250{
1251    managerLock( manager );
1252
1253    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1254
1255    managerUnlock( manager );
1256}
1257
1258void
1259tr_peerMgrAddTorrent( tr_peerMgr * manager,
1260                      tr_torrent * tor )
1261{
1262    Torrent * t;
1263
1264    managerLock( manager );
1265
1266    assert( tor != NULL );
1267    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1268
1269    t = torrentConstructor( manager, tor );
1270    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1271
1272    managerUnlock( manager );
1273}
1274
1275void
1276tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1277                         const uint8_t  * torrentHash )
1278{
1279    Torrent * t;
1280
1281    managerLock( manager );
1282
1283    t = getExistingTorrent( manager, torrentHash );
1284    assert( t != NULL );
1285    stopTorrent( t );
1286    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1287    torrentDestructor( t );
1288
1289    managerUnlock( manager );
1290}
1291
1292void
1293tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1294                               const uint8_t    * torrentHash,
1295                               int8_t           * tab,
1296                               int                tabCount )
1297{
1298    int i;
1299    const Torrent * t;
1300    const tr_torrent * tor;
1301    float interval;
1302    int isComplete;
1303    int peerCount;
1304    const tr_peer ** peers;
1305
1306    managerLock( manager );
1307
1308    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1309    tor = t->tor;
1310    interval = tor->info.pieceCount / (float)tabCount;
1311    isComplete = tor && ( tr_cpGetStatus ( tor->completion ) == TR_CP_COMPLETE );
1312    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1313
1314    memset( tab, 0, tabCount );
1315
1316    for( i=0; tor && i<tabCount; ++i )
1317    {
1318        const int piece = i * interval;
1319
1320        if( isComplete || tr_cpPieceIsComplete( tor->completion, piece ) )
1321            tab[i] = -1;
1322        else if( peerCount ) {
1323            int j;
1324            for( j=0; j<peerCount; ++j )
1325                if( tr_bitfieldHas( peers[j]->have, i ) )
1326                    ++tab[i];
1327        }
1328    }
1329
1330    managerUnlock( manager );
1331}
1332
1333/* Returns the pieces that are available from peers */
1334tr_bitfield*
1335tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1336                        const uint8_t    * torrentHash )
1337{
1338    int i, size;
1339    Torrent * t;
1340    tr_peer ** peers;
1341    tr_bitfield * pieces;
1342    managerLock( manager );
1343
1344    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1345    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
1346    peers = getConnectedPeers( t, &size );
1347    for( i=0; i<size; ++i )
1348        tr_bitfieldOr( pieces, peers[i]->have );
1349
1350    managerUnlock( manager );
1351    tr_free( peers );
1352    return pieces;
1353}
1354
1355int
1356tr_peerMgrHasConnections( const tr_peerMgr * manager,
1357                          const uint8_t    * torrentHash )
1358{
1359    int ret;
1360    const Torrent * t;
1361    managerLock( manager );
1362
1363    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1364    ret = t && ( !tr_ptrArrayEmpty( t->peers ) || !tr_ptrArrayEmpty( t->webseeds ) );
1365
1366    managerUnlock( manager );
1367    return ret;
1368}
1369
1370void
1371tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1372                        const uint8_t    * torrentHash,
1373                        int              * setmePeersKnown,
1374                        int              * setmePeersConnected,
1375                        int              * setmeSeedsConnected,
1376                        int              * setmeWebseedsSendingToUs,
1377                        int              * setmePeersSendingToUs,
1378                        int              * setmePeersGettingFromUs,
1379                        int              * setmePeersFrom )
1380{
1381    int i, size;
1382    const Torrent * t;
1383    const tr_peer ** peers;
1384    const tr_webseed ** webseeds;
1385
1386    managerLock( manager );
1387
1388    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1389    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1390
1391    *setmePeersKnown           = tr_ptrArraySize( t->pool );
1392    *setmePeersConnected       = 0;
1393    *setmeSeedsConnected       = 0;
1394    *setmePeersGettingFromUs   = 0;
1395    *setmePeersSendingToUs     = 0;
1396    *setmeWebseedsSendingToUs  = 0;
1397
1398    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1399        setmePeersFrom[i] = 0;
1400
1401    for( i=0; i<size; ++i )
1402    {
1403        const tr_peer * peer = peers[i];
1404        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1405
1406        if( peer->io == NULL ) /* not connected */
1407            continue;
1408
1409        ++*setmePeersConnected;
1410
1411        ++setmePeersFrom[atom->from];
1412
1413        if( clientIsDownloadingFrom( peer ) )
1414            ++*setmePeersSendingToUs;
1415
1416        if( clientIsUploadingTo( peer ) )
1417            ++*setmePeersGettingFromUs;
1418
1419        if( atom->flags & ADDED_F_SEED_FLAG )
1420            ++*setmeSeedsConnected;
1421    }
1422
1423    webseeds = (const tr_webseed **) tr_ptrArrayPeek( t->webseeds, &size );
1424    for( i=0; i<size; ++i )
1425    {
1426        if( tr_webseedIsActive( webseeds[i] ) )
1427            ++*setmeWebseedsSendingToUs;
1428    }
1429
1430    managerUnlock( manager );
1431}
1432
1433float*
1434tr_peerMgrWebSpeeds( const tr_peerMgr * manager,
1435                     const uint8_t    * torrentHash )
1436{
1437    const Torrent * t;
1438    const tr_webseed ** webseeds;
1439    int i;
1440    int webseedCount;
1441    float * ret;
1442
1443    assert( manager );
1444    managerLock( manager );
1445
1446    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1447    webseeds = (const tr_webseed**) tr_ptrArrayPeek( t->webseeds, &webseedCount );
1448    assert( webseedCount == t->tor->info.webseedCount );
1449    ret = tr_new0( float, webseedCount );
1450
1451    for( i=0; i<webseedCount; ++i )
1452        if( !tr_webseedGetSpeed( webseeds[i], &ret[i] ) )
1453            ret[i] = -1.0;
1454
1455    managerUnlock( manager );
1456    return ret;
1457}
1458
1459struct tr_peer_stat *
1460tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1461                     const uint8_t     * torrentHash,
1462                     int               * setmeCount UNUSED )
1463{
1464    int i, size;
1465    const Torrent * t;
1466    tr_peer ** peers;
1467    tr_peer_stat * ret;
1468
1469    assert( manager );
1470    managerLock( manager );
1471
1472    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1473    peers = getConnectedPeers( (Torrent*)t, &size );
1474    ret = tr_new0( tr_peer_stat, size );
1475
1476    for( i=0; i<size; ++i )
1477    {
1478        char * pch;
1479        const tr_peer * peer = peers[i];
1480        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1481        tr_peer_stat * stat = ret + i;
1482
1483        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1484        tr_strlcpy( stat->client, (peer->client ? peer->client : ""), sizeof(stat->client) );
1485        stat->port               = peer->port;
1486        stat->from               = atom->from;
1487        stat->progress           = peer->progress;
1488        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1489        stat->uploadToRate       = peer->rateToPeer;
1490        stat->downloadFromRate   = peer->rateToClient;
1491        stat->peerIsChoked       = peer->peerIsChoked;
1492        stat->peerIsInterested   = peer->peerIsInterested;
1493        stat->clientIsChoked     = peer->clientIsChoked;
1494        stat->clientIsInterested = peer->clientIsInterested;
1495        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1496        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
1497        stat->isUploadingTo      = clientIsUploadingTo( peer );
1498
1499        pch = stat->flagStr;
1500        if( t->optimistic == peer ) *pch++ = 'O';
1501        if( stat->isDownloadingFrom ) *pch++ = 'D';
1502        else if( stat->clientIsInterested ) *pch++ = 'd';
1503        if( stat->isUploadingTo ) *pch++ = 'U';
1504        else if( stat->peerIsInterested ) *pch++ = 'u';
1505        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
1506        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
1507        if( stat->isEncrypted ) *pch++ = 'E';
1508        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
1509        if( stat->isIncoming ) *pch++ = 'I';
1510        *pch = '\0';
1511    }
1512
1513    *setmeCount = size;
1514    tr_free( peers );
1515
1516    managerUnlock( manager );
1517    return ret;
1518}
1519
1520/**
1521***
1522**/
1523
1524struct ChokeData
1525{
1526    unsigned int  doUnchoke     : 1;
1527    unsigned int  isInterested  : 1;
1528    uint32_t rate;
1529    tr_peer * peer;
1530};
1531
1532static int
1533compareChoke( const void * va, const void * vb )
1534{
1535    const struct ChokeData * a = va;
1536    const struct ChokeData * b = vb;
1537    int diff = 0;
1538
1539    if( diff == 0 ) /* prefer higher dl speeds */
1540        diff = -tr_compareDouble( a->peer->rateToClient, b->peer->rateToClient );
1541    if( diff == 0 ) /* prefer higher ul speeds */
1542        diff = -tr_compareDouble( a->peer->rateToPeer, b->peer->rateToPeer );
1543    if( diff == 0 ) /* prefer unchoked */
1544        diff = (int)a->peer->peerIsChoked - (int)b->peer->peerIsChoked;
1545
1546    return diff;
1547}
1548
1549static int
1550isNew( const tr_peer * peer )
1551{
1552    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
1553}
1554
1555static int
1556isSame( const tr_peer * peer )
1557{
1558    return peer && peer->client && strstr( peer->client, "Transmission" );
1559}
1560
1561/**
1562***
1563**/
1564
1565static int
1566getWeightedRate( const tr_peer * peer, int clientIsSeed )
1567{
1568    return (int)( 10.0 * ( clientIsSeed ? peer->rateToPeer
1569                                        : peer->rateToClient ) );
1570}
1571
1572static void
1573rechoke( Torrent * t )
1574{
1575    int i, peerCount, size, unchokedInterested;
1576    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1577    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1578    const int clientIsSeed = tr_torrentIsSeed( t->tor );
1579
1580    assert( torrentIsLocked( t ) );
1581   
1582    /* sort the peers by preference and rate */
1583    for( i=0, size=0; i<peerCount; ++i )
1584    {
1585        tr_peer * peer = peers[i];
1586        if( peer->progress >= 1.0 ) /* choke all seeds */
1587            tr_peerMsgsSetChoke( peer->msgs, TRUE );
1588        else {
1589            struct ChokeData * node = &choke[size++];
1590            node->peer = peer;
1591            node->isInterested = peer->peerIsInterested;
1592            node->rate = getWeightedRate( peer, clientIsSeed );
1593        }
1594    }
1595
1596    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1597
1598    /**
1599     * Reciprocation and number of uploads capping is managed by unchoking
1600     * the N peers which have the best upload rate and are interested.
1601     * This maximizes the client's download rate. These N peers are
1602     * referred to as downloaders, because they are interested in downloading
1603     * from the client.
1604     *
1605     * Peers which have a better upload rate (as compared to the downloaders)
1606     * but aren't interested get unchoked. If they become interested, the
1607     * downloader with the worst upload rate gets choked. If a client has
1608     * a complete file, it uses its upload rate rather than its download
1609     * rate to decide which peers to unchoke.
1610     */
1611    unchokedInterested = 0;
1612    for( i=0; i<size && unchokedInterested<MAX_UNCHOKED_PEERS; ++i ) {
1613        choke[i].doUnchoke = 1;
1614        if( choke[i].isInterested )
1615            ++unchokedInterested;
1616    }
1617
1618    /* optimistic unchoke */
1619    if( i < size )
1620    {
1621        int n;
1622        struct ChokeData * c;
1623        tr_ptrArray * randPool = tr_ptrArrayNew( );
1624
1625        for( ; i<size; ++i )
1626        {
1627            if( choke[i].isInterested )
1628            {
1629                const tr_peer * peer = choke[i].peer;
1630                int x=1, y;
1631                if( isNew( peer ) ) x *= 3;
1632                if( isSame( peer ) ) x *= 3;
1633                for( y=0; y<x; ++y )
1634                    tr_ptrArrayAppend( randPool, &choke[i] );
1635            }
1636        }
1637
1638        if(( n = tr_ptrArraySize( randPool )))
1639        {
1640            c = tr_ptrArrayNth( randPool, tr_rand( n ));
1641            c->doUnchoke = 1;
1642            t->optimistic = c->peer;
1643        }
1644
1645        tr_ptrArrayFree( randPool, NULL );
1646    }
1647
1648    for( i=0; i<size; ++i )
1649        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1650
1651    /* cleanup */
1652    tr_free( choke );
1653    tr_free( peers );
1654}
1655
1656static int
1657rechokePulse( void * vtorrent )
1658{
1659    Torrent * t = vtorrent;
1660    torrentLock( t );
1661    rechoke( t );
1662    torrentUnlock( t );
1663    return TRUE;
1664}
1665
1666/***
1667****
1668****  Life and Death
1669****
1670***/
1671
1672static int
1673shouldPeerBeClosed( const Torrent * t, const tr_peer * peer, int peerCount )
1674{
1675    const tr_torrent * tor = t->tor;
1676    const time_t now = time( NULL );
1677    const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1678
1679    /* if it's marked for purging, close it */
1680    if( peer->doPurge ) {
1681        tordbg( t, "purging peer %s because its doPurge flag is set", tr_peerIoAddrStr(&atom->addr,atom->port) );
1682        return TRUE;
1683    }
1684
1685    /* if we're seeding and the peer has everything we have,
1686     * and enough time has passed for a pex exchange, then disconnect */
1687    if( tr_torrentIsSeed( tor ) ) {
1688        int peerHasEverything;
1689        if( atom->flags & ADDED_F_SEED_FLAG )
1690            peerHasEverything = TRUE;
1691        else if( peer->progress < tr_cpPercentDone( tor->completion ) )
1692            peerHasEverything = FALSE;
1693        else {
1694            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( tor->completion ) );
1695            tr_bitfieldDifference( tmp, peer->have );
1696            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
1697            tr_bitfieldFree( tmp );
1698        }
1699        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30) ) ) {
1700            tordbg( t, "purging peer %s because we're both seeds", tr_peerIoAddrStr(&atom->addr,atom->port) );
1701            return TRUE;
1702        }
1703    }
1704
1705    /* disconnect if it's been too long since piece data has been transferred.
1706     * this is on a sliding scale based on number of available peers... */
1707    {
1708        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount(tor) * 0.9) + 0.5);
1709        /* if we have >= relaxIfFewerThan, strictness is 100%.
1710         * if we have zero connections, strictness is 0% */
1711        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
1712            ? 1.0
1713            : peerCount / (float)relaxStrictnessIfFewerThanN;
1714        const int lo = MIN_UPLOAD_IDLE_SECS;
1715        const int hi = MAX_UPLOAD_IDLE_SECS;
1716        const int limit = lo + ((hi-lo) * strictness);
1717        const time_t then = peer->pieceDataActivityDate;
1718        const int idleTime = then ? (now-then) : 0;
1719        if( idleTime > limit ) {
1720            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
1721                       tr_peerIoAddrStr(&atom->addr,atom->port), idleTime );
1722            return TRUE;
1723        }
1724    }
1725
1726    return FALSE;
1727}
1728
1729static tr_peer **
1730getPeersToClose( Torrent * t, int * setmeSize )
1731{
1732    int i, peerCount, outsize;
1733    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &peerCount );
1734    struct tr_peer ** ret = tr_new( tr_peer*, peerCount );
1735
1736    assert( torrentIsLocked( t ) );
1737
1738    for( i=outsize=0; i<peerCount; ++i )
1739        if( shouldPeerBeClosed( t, peers[i], peerCount ) )
1740            ret[outsize++] = peers[i];
1741
1742    *setmeSize = outsize;
1743    return ret;
1744}
1745
1746static int
1747compareCandidates( const void * va, const void * vb )
1748{
1749    const struct peer_atom * a = * (const struct peer_atom**) va;
1750    const struct peer_atom * b = * (const struct peer_atom**) vb;
1751    int i;
1752
1753    if( a->piece_data_time > b->piece_data_time ) return -1;
1754    if( a->piece_data_time < b->piece_data_time ) return  1;
1755
1756    if(( i = tr_compareUint16( a->numFails, b->numFails )))
1757        return i;
1758
1759    if( a->time != b->time )
1760        return a->time < b->time ? -1 : 1;
1761
1762    return 0;
1763}
1764
1765static struct peer_atom **
1766getPeerCandidates( Torrent * t, int * setmeSize )
1767{
1768    int i, atomCount, retCount;
1769    struct peer_atom ** atoms;
1770    struct peer_atom ** ret;
1771    const time_t now = time( NULL );
1772    const int seed = tr_torrentIsSeed( t->tor );
1773
1774    assert( torrentIsLocked( t ) );
1775
1776    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &atomCount );
1777    ret = tr_new( struct peer_atom*, atomCount );
1778    for( i=retCount=0; i<atomCount; ++i )
1779    {
1780        struct peer_atom * atom = atoms[i];
1781
1782        /* peer fed us too much bad data ... we only keep it around
1783         * now to weed it out in case someone sends it to us via pex */
1784        if( atom->myflags & MYFLAG_BANNED )
1785            continue;
1786
1787        /* peer was unconnectable before, so we're not going to keep trying.
1788         * this is needs a separate flag from `banned', since if they try
1789         * to connect to us later, we'll let them in */
1790        if( atom->myflags & MYFLAG_UNREACHABLE )
1791            continue;
1792
1793        /* we don't need two connections to the same peer... */
1794        if( peerIsInUse( t, &atom->addr ) )
1795            continue;
1796
1797        /* no need to connect if we're both seeds... */
1798        if( seed && (atom->flags & ADDED_F_SEED_FLAG) )
1799            continue;
1800
1801        /* we're wasting our time trying to connect to this bozo. */
1802        if( atom->numFails > 3 )
1803            continue;
1804
1805        /* If we were connected to this peer recently and transferring
1806         * piece data, try to reconnect -- network troubles may have
1807         * disconnected us.  but if we weren't sharing piece data,
1808         * hold off on this peer to give another one a try instead */
1809        if( ( now - atom->piece_data_time ) > 30 )
1810        {
1811            int minWait = (60 * 5); /* five minutes */
1812            int maxWait = minWait * 3;
1813            int wait = atom->numFails * minWait;
1814            if( wait < minWait ) wait = minWait;
1815            if( wait > maxWait ) wait = maxWait;
1816            if( ( now - atom->time ) < wait ) {
1817                tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
1818                        i, tr_peerIoAddrStr(&atom->addr,atom->port), wait );
1819                continue;
1820            }
1821        }
1822
1823        /* Don't connect to peers in our blocklist */
1824        if( tr_sessionIsAddressBlocked( t->manager->handle, &atom->addr ) )
1825            continue;
1826
1827        ret[retCount++] = atom;
1828    }
1829
1830    qsort( ret, retCount, sizeof(struct peer_atom*), compareCandidates );
1831    *setmeSize = retCount;
1832    return ret;
1833}
1834
1835static int
1836reconnectPulse( void * vtorrent )
1837{
1838    Torrent * t = vtorrent;
1839    static time_t prevTime = 0;
1840    static int newConnectionsThisSecond = 0;
1841    time_t now;
1842
1843    torrentLock( t );
1844
1845    now = time( NULL );
1846    if( prevTime != now )
1847    {
1848        prevTime = now;
1849        newConnectionsThisSecond = 0;
1850    }
1851
1852    if( !t->isRunning )
1853    {
1854        removeAllPeers( t );
1855    }
1856    else
1857    {
1858        int i, nCandidates, nBad;
1859        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1860        struct tr_peer ** connections = getPeersToClose( t, &nBad );
1861
1862        if( nBad || nCandidates )
1863            tordbg( t, "reconnect pulse for [%s]: %d bad connections, "
1864                       "%d connection candidates, %d atoms, max per pulse is %d",
1865                       t->tor->info.name, nBad, nCandidates,
1866                       tr_ptrArraySize(t->pool),
1867                       (int)MAX_RECONNECTIONS_PER_PULSE );
1868
1869        /* disconnect some peers.
1870           if we got transferred piece data, then they might be good peers,
1871           so reset their `numFails' weight to zero.  otherwise we connected
1872           to them fruitlessly, so mark it as another fail */
1873        for( i=0; i<nBad; ++i ) {
1874            tr_peer * peer = connections[i];
1875            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1876            if( peer->pieceDataActivityDate )
1877                atom->numFails = 0;
1878            else
1879                ++atom->numFails;
1880            removePeer( t, peer );
1881        }
1882
1883        /* add some new ones */
1884        for( i=0;    i < nCandidates
1885                  && i < MAX_RECONNECTIONS_PER_PULSE
1886                  && newConnectionsThisSecond < MAX_CONNECTIONS_PER_SECOND; ++i )
1887        {
1888            tr_peerMgr * mgr = t->manager;
1889            struct peer_atom * atom = candidates[i];
1890            tr_peerIo * io;
1891
1892            tordbg( t, "Starting an OUTGOING connection with %s",
1893                       tr_peerIoAddrStr( &atom->addr, atom->port ) );
1894
1895            io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1896            if( io == NULL )
1897            {
1898                atom->myflags |= MYFLAG_UNREACHABLE;
1899            }
1900            else
1901            {
1902                tr_handshake * handshake = tr_handshakeNew( io,
1903                                                            mgr->handle->encryptionMode,
1904                                                            myHandshakeDoneCB,
1905                                                            mgr );
1906
1907                assert( tr_peerIoGetTorrentHash( io ) != NULL );
1908
1909                ++newConnectionsThisSecond;
1910
1911                tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1912            }
1913
1914            atom->time = time( NULL );
1915        }
1916
1917        /* cleanup */
1918        tr_free( connections );
1919        tr_free( candidates );
1920    }
1921
1922    torrentUnlock( t );
1923    return TRUE;
1924}
Note: See TracBrowser for help on using the repository browser.