source: trunk/libtransmission/peer-mgr.c @ 3322

Last change on this file since 3322 was 3322, checked in by charles, 15 years ago

the purge flag should take priority over all the other reasons to keep or purge a peer. :p

  • Property svn:keywords set to Date Rev Author Id
File size: 44.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3322 2007-10-08 01:53:11Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <sys/types.h> /* event.h needs this */
20#include <event.h>
21
22#include "transmission.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "platform.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "shared.h"
36#include "trevent.h"
37#include "utils.h"
38
39enum
40{
41    /* how frequently to change which peers are choked */
42    RECHOKE_PERIOD_MSEC = (1000),
43
44    /* how frequently to decide which peers live and die */
45    RECONNECT_PERIOD_MSEC = (5 * 1000),
46
47    /* how frequently to refill peers' request lists */
48    REFILL_PERIOD_MSEC = 666,
49
50    /* don't change a peer's choke status more often than this */
51    MIN_CHOKE_PERIOD_SEC = 10,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* this is arbitrary and, hopefully, temporary until we come up
58     * with a better idea for managing the connection limits */
59    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
60
61    /* how many peers to unchoke per-torrent. */
62    /* FIXME: make this user-configurable? */
63    NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
64
65    /* set this too high and there will be a lot of churn.
66     * set it too low and you'll get peers too slowly */
67    MAX_RECONNECTIONS_PER_PULSE = 8,
68
69    /* corresponds to ut_pex's added.f flags */
70    ADDED_F_ENCRYPTION_FLAG = 1,
71    /* corresponds to ut_pex's added.f flags */
72    ADDED_F_SEED_FLAG = 2,
73
74    /* number of bad pieces a peer is allowed to send before we ban them */
75    MAX_BAD_PIECES_PER_PEER = 3,
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1
78};
79
80
81/**
82***
83**/
84
85/* We keep one of these for every peer we know about, whether
86 * it's connected or not, so the struct must be small.
87 * When our current connections underperform, we dip back
88 * into this list for new ones. */
89struct peer_atom
90{   
91    uint8_t from;
92    uint8_t flags; /* these match the added_f flags */
93    uint8_t myflags; /* flags that aren't defined in added_f */
94    uint16_t port;
95    struct in_addr addr; 
96    time_t time;
97};
98
99typedef struct
100{
101    uint8_t hash[SHA_DIGEST_LENGTH];
102    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
103    tr_ptrArray * pool; /* struct peer_atom */
104    tr_ptrArray * peers; /* tr_peer */
105    tr_timer * reconnectTimer;
106    tr_timer * rechokeTimer;
107    tr_timer * refillTimer;
108    tr_torrent * tor;
109    tr_bitfield * requested;
110
111    unsigned int isRunning : 1;
112
113    struct tr_peerMgr * manager;
114}
115Torrent;
116
117struct tr_peerMgr
118{
119    int connectionCount;
120    tr_handle * handle;
121    tr_ptrArray * torrents; /* Torrent */
122    tr_ptrArray * incomingHandshakes; /* tr_handshake */
123};
124
125/**
126***
127**/
128
129static void
130myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
131{
132    FILE * fp = tr_getLog( );
133    if( fp != NULL )
134    {
135        va_list args;
136        struct evbuffer * buf = evbuffer_new( );
137        char timestr[64];
138        evbuffer_add_printf( buf, "[%s] %s: ",
139                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
140                             t->tor->info.name );
141        va_start( args, fmt );
142        evbuffer_add_vprintf( buf, fmt, args );
143        va_end( args );
144        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
145
146        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
147        evbuffer_free( buf );
148    }
149}
150
151#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
152
153/**
154***
155**/
156
157static void
158managerLock( struct tr_peerMgr * manager )
159{
160    tr_globalLock( manager->handle );
161}
162static void
163managerUnlock( struct tr_peerMgr * manager )
164{
165    tr_globalUnlock( manager->handle );
166}
167static void
168torrentLock( Torrent * torrent )
169{
170    managerLock( torrent->manager );
171}
172static void
173torrentUnlock( Torrent * torrent )
174{
175    managerUnlock( torrent->manager );
176}
177static int
178torrentIsLocked( const Torrent * t )
179{
180    return tr_globalIsLocked( t->manager->handle );
181}
182
183/**
184***
185**/
186
187static int
188compareAddresses( const struct in_addr * a, const struct in_addr * b )
189{
190    return tr_compareUint32( a->s_addr, b->s_addr );
191}
192
193static int
194handshakeCompareToAddr( const void * va, const void * vb )
195{
196    const tr_handshake * a = va;
197    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
198}
199
200static int
201handshakeCompare( const void * a, const void * b )
202{
203    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
204}
205
206static tr_handshake*
207getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
208{
209    return tr_ptrArrayFindSorted( handshakes,
210                                  in_addr,
211                                  handshakeCompareToAddr );
212}
213
214static int
215comparePeerAtomToAddress( const void * va, const void * vb )
216{
217    const struct peer_atom * a = va;
218    return compareAddresses( &a->addr, vb );
219}
220
221static int
222comparePeerAtoms( const void * va, const void * vb )
223{
224    const struct peer_atom * b = vb;
225    return comparePeerAtomToAddress( va, &b->addr );
226}
227
228/**
229***
230**/
231
232static int
233torrentCompare( const void * va, const void * vb )
234{
235    const Torrent * a = va;
236    const Torrent * b = vb;
237    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
238}
239
240static int
241torrentCompareToHash( const void * va, const void * vb )
242{
243    const Torrent * a = va;
244    const uint8_t * b_hash = vb;
245    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
246}
247
248static Torrent*
249getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
250{
251    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
252                                             hash,
253                                             torrentCompareToHash );
254}
255
256static int
257peerCompare( const void * va, const void * vb )
258{
259    const tr_peer * a = va;
260    const tr_peer * b = vb;
261    return compareAddresses( &a->in_addr, &b->in_addr );
262}
263
264static int
265peerCompareToAddr( const void * va, const void * vb )
266{
267    const tr_peer * a = va;
268    return compareAddresses( &a->in_addr, vb );
269}
270
271static tr_peer*
272getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    assert( torrentIsLocked( torrent ) );
275    assert( in_addr != NULL );
276
277    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
278                                             in_addr,
279                                             peerCompareToAddr );
280}
281
282static struct peer_atom*
283getExistingAtom( const Torrent * t, const struct in_addr * addr )
284{
285    assert( torrentIsLocked( t ) );
286    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
287}
288
289static int
290peerIsKnown( const Torrent * t, const struct in_addr * addr )
291{
292    return getExistingAtom( t, addr ) != NULL;
293}
294
295static int
296peerIsInUse( const Torrent * ct, const struct in_addr * addr )
297{
298    Torrent * t = (Torrent*) ct;
299
300    assert( torrentIsLocked ( t ) );
301
302    return getExistingPeer( t, addr )
303        || getExistingHandshake( t->outgoingHandshakes, addr )
304        || getExistingHandshake( t->manager->incomingHandshakes, addr );
305}
306
307static tr_peer*
308peerConstructor( const struct in_addr * in_addr )
309{
310    tr_peer * p;
311    p = tr_new0( tr_peer, 1 );
312    p->rcToClient = tr_rcInit( );
313    p->rcToPeer = tr_rcInit( );
314    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
315    return p;
316}
317
318static tr_peer*
319getPeer( Torrent * torrent, const struct in_addr * in_addr )
320{
321    tr_peer * peer;
322
323    assert( torrentIsLocked( torrent ) );
324
325    peer = getExistingPeer( torrent, in_addr );
326
327    if( peer == NULL ) {
328        peer = peerConstructor( in_addr );
329        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
330    }
331
332    return peer;
333}
334
335static void
336peerDestructor( tr_peer * peer )
337{
338    assert( peer != NULL );
339    assert( peer->msgs != NULL );
340
341    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
342    tr_peerMsgsFree( peer->msgs );
343
344    tr_peerIoFree( peer->io );
345
346    tr_bitfieldFree( peer->have );
347    tr_bitfieldFree( peer->blame );
348    tr_rcClose( peer->rcToClient );
349    tr_rcClose( peer->rcToPeer );
350    tr_free( peer->client );
351    tr_free( peer );
352}
353
354static void
355removePeer( Torrent * t, tr_peer * peer )
356{
357    tr_peer * removed;
358    struct peer_atom * atom;
359
360    assert( torrentIsLocked( t ) );
361
362    atom = getExistingAtom( t, &peer->in_addr );
363    assert( atom != NULL );
364    atom->time = time( NULL );
365
366    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
367    assert( removed == peer );
368    peerDestructor( removed );
369}
370
371static void
372removeAllPeers( Torrent * t )
373{
374    while( !tr_ptrArrayEmpty( t->peers ) )
375        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
376}
377
378static void
379torrentDestructor( Torrent * t )
380{
381    uint8_t hash[SHA_DIGEST_LENGTH];
382
383    assert( t != NULL );
384    assert( !t->isRunning );
385    assert( t->peers != NULL );
386    assert( torrentIsLocked( t ) );
387    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
388    assert( tr_ptrArrayEmpty( t->peers ) );
389
390    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
391
392    tr_timerFree( &t->reconnectTimer );
393    tr_timerFree( &t->rechokeTimer );
394    tr_timerFree( &t->refillTimer );
395
396    tr_bitfieldFree( t->requested );
397    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
398    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
399    tr_ptrArrayFree( t->peers, NULL );
400
401    tr_free( t );
402}
403
404static Torrent*
405torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
406{
407    Torrent * t;
408
409    t = tr_new0( Torrent, 1 );
410    t->manager = manager;
411    t->tor = tor;
412    t->pool = tr_ptrArrayNew( );
413    t->peers = tr_ptrArrayNew( );
414    t->outgoingHandshakes = tr_ptrArrayNew( );
415    t->requested = tr_bitfieldNew( tor->blockCount );
416    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
417
418    return t;
419}
420
421/**
422***
423**/
424
425struct tr_bitfield *
426tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
427                              const uint32_t         pieceCount,
428                              const uint8_t          infohash[20],
429                              const struct in_addr * ip )
430{
431    /* This has been checked against the spec example implementation. Feeding it with :
432    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
433generate :
434    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
435    but since we're storing in a bitfield, it won't be in this order... */
436    /* TODO : We should translate link-local IPv4 adresses to external IP,
437     * so that being on same local network gives us the same allowed pieces */
438   
439    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
440    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
441            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
442            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
443            inet_ntoa( *ip ) );
444   
445    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
446    char buf[4];
447    uint32_t allowedPieceCount = 0;
448    tr_bitfield_t * ret;
449   
450    ret = tr_bitfieldNew( pieceCount );
451   
452    /* We need a seed based on most significant bytes of peer address
453        concatenated with torrent's infohash */
454    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
455   
456    memcpy( seed, &buf, 4 );
457    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
458   
459    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
460   
461    while ( allowedPieceCount < setCount )
462    {
463        int i;
464        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
465        {
466            /* We generate indices from 4-byte chunks of the seed */
467            uint32_t j = i * 4;
468            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
469            uint32_t index = y % pieceCount;
470           
471            if ( !tr_bitfieldHas( ret, index ) )
472            {
473                tr_bitfieldAdd( ret, index );
474                allowedPieceCount++;
475            }
476        }
477        /* We randomize the seed, in case we need to iterate more */
478        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
479    }
480    tr_free( seed );
481   
482    return ret;
483}
484
485tr_peerMgr*
486tr_peerMgrNew( tr_handle * handle )
487{
488    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
489    m->handle = handle;
490    m->torrents = tr_ptrArrayNew( );
491    m->incomingHandshakes = tr_ptrArrayNew( );
492    return m;
493}
494
495void
496tr_peerMgrFree( tr_peerMgr * manager )
497{
498    managerLock( manager );
499
500    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
501     * the item from manager->handshakes, so this is a little roundabout... */
502    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
503        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
504    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
505
506    /* free the torrents. */
507    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
508
509    managerUnlock( manager );
510    tr_free( manager );
511}
512
513static tr_peer**
514getConnectedPeers( Torrent * t, int * setmeCount )
515{
516    int i, peerCount, connectionCount;
517    tr_peer **peers;
518    tr_peer **ret;
519
520    assert( torrentIsLocked( t ) );
521
522    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
523    ret = tr_new( tr_peer*, peerCount );
524
525    for( i=connectionCount=0; i<peerCount; ++i )
526        if( peers[i]->msgs != NULL )
527            ret[connectionCount++] = peers[i];
528
529    *setmeCount = connectionCount;
530    return ret;
531}
532
533/***
534****  Refill
535***/
536
537struct tr_refill_piece
538{
539    tr_priority_t priority;
540    uint16_t random;
541    uint32_t piece;
542    uint32_t peerCount;
543    uint32_t fastAllowed;
544};
545
546static int
547compareRefillPiece (const void * aIn, const void * bIn)
548{
549    const struct tr_refill_piece * a = aIn;
550    const struct tr_refill_piece * b = bIn;
551
552    /* if one piece has a higher priority, it goes first */
553    if (a->priority != b->priority)
554        return a->priority > b->priority ? -1 : 1;
555
556    /* otherwise if one has fewer peers, it goes first */
557    if (a->peerCount != b->peerCount)
558        return a->peerCount < b->peerCount ? -1 : 1;
559   
560    /* otherwise if one *might be* fastallowed to us */
561    if (a->fastAllowed != b->fastAllowed)
562        return a->fastAllowed < b->fastAllowed ? -1 : 1;
563
564    /* otherwise go with our random seed */
565    return tr_compareUint16( a->random, b->random );
566}
567
568static int
569isPieceInteresting( const tr_torrent  * tor,
570                    int                 piece )
571{
572    if( tor->info.pieces[piece].dnd ) /* we don't want it */
573        return 0;
574
575    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
576        return 0;
577
578    return 1;
579}
580
581static uint32_t*
582getPreferredPieces( Torrent     * t,
583                    uint32_t    * pieceCount )
584{
585    const tr_torrent * tor = t->tor;
586    const tr_info * inf = &tor->info;
587    int i;
588    uint32_t poolSize = 0;
589    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
590    int peerCount;
591    tr_peer** peers;
592
593    assert( torrentIsLocked( t ) );
594
595    peers = getConnectedPeers( t, &peerCount );
596
597    for( i=0; i<inf->pieceCount; ++i )
598        if( isPieceInteresting( tor, i ) )
599            pool[poolSize++] = i;
600
601    /* sort the pool from most interesting to least... */
602    if( poolSize > 1 )
603    {
604        uint32_t j;
605        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
606
607        for( j=0; j<poolSize; ++j )
608        {
609            int k;
610            const int piece = pool[j];
611            struct tr_refill_piece * setme = p + j;
612
613            setme->piece = piece;
614            setme->priority = inf->pieces[piece].priority;
615            setme->peerCount = 0;
616            setme->fastAllowed = 0;
617            setme->random = tr_rand( UINT16_MAX );
618            /* FIXME */
619//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
620
621            for( k=0; k<peerCount; ++k ) {
622                const tr_peer * peer = peers[k];
623                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
624                    ++setme->peerCount;
625            }
626        }
627
628        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
629
630        for( j=0; j<poolSize; ++j )
631            pool[j] = p[j].piece;
632
633        tr_free( p );
634    }
635
636    tr_free( peers );
637
638    *pieceCount = poolSize;
639    return pool;
640}
641
642static uint64_t*
643getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
644{
645    uint32_t i;
646    uint32_t pieceCount;
647    uint32_t * pieces;
648    uint64_t *req, *unreq, *ret, *walk;
649    int reqCount, unreqCount;
650    const tr_torrent * tor = t->tor;
651
652    assert( torrentIsLocked( t ) );
653
654    pieces = getPreferredPieces( t, &pieceCount );
655
656    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
657    reqCount = 0;
658    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
659    unreqCount = 0;
660
661    for( i=0; i<pieceCount; ++i ) {
662        const uint32_t index = pieces[i];
663        const int begin = tr_torPieceFirstBlock( tor, index );
664        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
665        int block;
666        for( block=begin; block<end; ++block )
667            if( tr_cpBlockIsComplete( tor->completion, block ) )
668                continue;
669            else if( tr_bitfieldHas( t->requested, block ) )
670                req[reqCount++] = block;
671            else
672                unreq[unreqCount++] = block;
673    }
674
675    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
676    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
677    walk += unreqCount;
678    memcpy( walk, req, sizeof(uint64_t) * reqCount );
679    walk += reqCount;
680    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
681    *setmeCount = walk - ret;
682
683    tr_free( req );
684    tr_free( unreq );
685    tr_free( pieces );
686
687    return ret;
688}
689
690static int
691refillPulse( void * vtorrent )
692{
693    Torrent * t = vtorrent;
694    tr_torrent * tor = t->tor;
695    uint32_t i;
696    int peerCount;
697    tr_peer ** peers;
698    uint64_t blockCount;
699    uint64_t * blocks;
700
701    if( !t->isRunning )
702        return TRUE;
703    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
704        return TRUE;
705
706    torrentLock( t );
707    tordbg( t, "Refilling Request Buffers..." );
708
709    blocks = getPreferredBlocks( t, &blockCount );
710    peers = getConnectedPeers( t, &peerCount );
711
712    for( i=0; peerCount && i<blockCount; ++i )
713    {
714        const int block = blocks[i];
715        const uint32_t index = tr_torBlockPiece( tor, block );
716        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
717        const uint32_t length = tr_torBlockCountBytes( tor, block );
718        int j;
719        assert( _tr_block( tor, index, begin ) == block );
720        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
721        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
722
723
724        /* find a peer who can ask for this block */
725        for( j=0; j<peerCount; )
726        {
727            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
728            switch( val )
729            {
730                case TR_ADDREQ_FULL: 
731                case TR_ADDREQ_CLIENT_CHOKED:
732                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
733                    break;
734
735                case TR_ADDREQ_MISSING: 
736                case TR_ADDREQ_DUPLICATE: 
737                    ++j;
738                    break;
739
740                case TR_ADDREQ_OK:
741                    tr_bitfieldAdd( t->requested, block );
742                    j = peerCount;
743                    break;
744
745                default:
746                    assert( 0 && "unhandled value" );
747                    break;
748            }
749        }
750    }
751
752    /* cleanup */
753    tr_free( peers );
754    tr_free( blocks );
755
756    t->refillTimer = NULL;
757    torrentUnlock( t );
758    return FALSE;
759}
760
761static void
762broadcastClientHave( Torrent * t, uint32_t index )
763{
764    int i, size;
765    tr_peer ** peers;
766
767    assert( torrentIsLocked( t ) );
768
769    peers = getConnectedPeers( t, &size );
770    for( i=0; i<size; ++i )
771        tr_peerMsgsHave( peers[i]->msgs, index );
772    tr_free( peers );
773}
774
775static void
776broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
777{
778    int i, size;
779    tr_peer ** peers;
780
781    assert( torrentIsLocked( t ) );
782
783    peers = getConnectedPeers( t, &size );
784    for( i=0; i<size; ++i )
785        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
786    tr_free( peers );
787}
788
789static void
790msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
791{
792    tr_peer * peer = vpeer;
793    Torrent * t = (Torrent *) vt;
794    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
795
796    torrentLock( t );
797
798    switch( e->eventType )
799    {
800        case TR_PEERMSG_NEED_REQ:
801            if( t->refillTimer == NULL )
802                t->refillTimer = tr_timerNew( t->manager->handle,
803                                              refillPulse, t,
804                                              REFILL_PERIOD_MSEC );
805            break;
806
807        case TR_PEERMSG_CANCEL:
808            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
809            break;
810
811        case TR_PEERMSG_CLIENT_HAVE:
812            broadcastClientHave( t, e->pieceIndex );
813            tr_torrentRecheckCompleteness( t->tor );
814            break;
815
816        case TR_PEERMSG_PEER_PROGRESS: {
817            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
818            const int peerIsSeed = e->progress >= 1.0;
819            if( peerIsSeed ) {
820                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
821                atom->flags |= ADDED_F_SEED_FLAG;
822            } else {
823                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
824                atom->flags &= ~ADDED_F_SEED_FLAG;
825            } break;
826        }
827
828        case TR_PEERMSG_CLIENT_BLOCK:
829            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
830            break;
831
832        case TR_PEERMSG_GOT_ERROR:
833            peer->doPurge = 1;
834            break;
835
836        default:
837            assert(0);
838    }
839
840    torrentUnlock( t );
841}
842
843static void
844ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
845{
846    if( !peerIsKnown( t, addr ) )
847    {
848        struct peer_atom * a = tr_new0( struct peer_atom, 1 );
849        a->addr = *addr;
850        a->port = port;
851        a->flags = flags;
852        a->from = from;
853        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
854        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
855    }
856}
857
858/* FIXME: this is kind of a mess. */
859static void
860myHandshakeDoneCB( tr_handshake    * handshake,
861                   tr_peerIo       * io,
862                   int               isConnected,
863                   const uint8_t   * peer_id,
864                   void            * vmanager )
865{
866    int ok = isConnected;
867    uint16_t port;
868    const struct in_addr * in_addr;
869    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
870    Torrent * t;
871    tr_handshake * ours;
872
873    assert( io != NULL );
874    assert( isConnected==0 || isConnected==1 );
875
876    t = tr_peerIoHasTorrentHash( io )
877        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
878        : NULL;
879
880    if( tr_peerIoIsIncoming ( io ) )
881        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
882                                        handshake, handshakeCompare );
883    else if( t != NULL )
884        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
885                                        handshake, handshakeCompare );
886    else
887        ours = handshake;
888
889    assert( ours != NULL );
890    assert( ours == handshake );
891
892    if( t != NULL )
893        torrentLock( t );
894
895    in_addr = tr_peerIoGetAddress( io, &port );
896
897    if( !t || !t->isRunning )
898    {
899        tr_peerIoFree( io );
900        --manager->connectionCount;
901    }
902    else if( !ok )
903    {
904        /* if we couldn't connect or were snubbed,
905         * the peer's probably not worth remembering. */
906        tr_peer * peer = getExistingPeer( t, in_addr );
907        tr_peerIoFree( io );
908        --manager->connectionCount;
909        if( peer )
910            peer->doPurge = 1;
911    }
912    else /* looking good */
913    {
914        uint16_t port;
915        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
916        struct peer_atom * atom;
917        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
918        atom = getExistingAtom( t, addr );
919        tr_peer * peer = getPeer( t, addr );
920        if( atom->myflags & MYFLAG_BANNED )
921        {
922            tordbg( t, "banned peer %s tried to reconnect", tr_peerIoAddrStr(&atom->addr,atom->port) );
923            tr_peerIoFree( io );
924            --manager->connectionCount;
925            peer->doPurge = 1;
926        }
927        else if( peer->msgs != NULL ) /* we already have this peer */
928        {
929            tr_peerIoFree( io );
930            --manager->connectionCount;
931        }
932        else
933        {
934            tr_free( peer->client );
935            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
936            peer->port = port;
937            peer->io = io;
938            peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
939        }
940    }
941
942    if( t != NULL )
943        torrentUnlock( t );
944}
945
946void
947tr_peerMgrAddIncoming( tr_peerMgr      * manager,
948                       struct in_addr  * addr,
949                       uint16_t          port,
950                       int               socket )
951{
952    managerLock( manager );
953
954    if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL )
955    {
956        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
957
958        tr_handshake * handshake = tr_handshakeNew( io,
959                                                    manager->handle->encryptionMode,
960                                                    myHandshakeDoneCB,
961                                                    manager );
962        ++manager->connectionCount;
963
964        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
965    }
966
967    managerUnlock( manager );
968}
969
970void
971tr_peerMgrAddPex( tr_peerMgr     * manager,
972                  const uint8_t  * torrentHash,
973                  int              from,
974                  const tr_pex   * pex,
975                  int              pexCount )
976{
977    Torrent * t;
978    const tr_pex * end;
979
980    managerLock( manager );
981
982    t = getExistingTorrent( manager, torrentHash );
983    for( end=pex+pexCount; pex!=end; ++pex )
984        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
985
986    managerUnlock( manager );
987}
988
989void
990tr_peerMgrAddPeers( tr_peerMgr    * manager,
991                    const uint8_t * torrentHash,
992                    int             from,
993                    const uint8_t * peerCompact,
994                    int             peerCount )
995{
996    int i;
997    const uint8_t * walk = peerCompact;
998    Torrent * t;
999
1000    managerLock( manager );
1001
1002    t = getExistingTorrent( manager, torrentHash );
1003    for( i=0; t!=NULL && i<peerCount; ++i )
1004    {
1005        struct in_addr addr;
1006        uint16_t port;
1007        memcpy( &addr, walk, 4 ); walk += 4;
1008        memcpy( &port, walk, 2 ); walk += 2;
1009        ensureAtomExists( t, &addr, port, 0, from );
1010    }
1011
1012    managerUnlock( manager );
1013}
1014
1015/**
1016***
1017**/
1018
1019void
1020tr_peerMgrSetBlame( tr_peerMgr     * manager,
1021                    const uint8_t  * torrentHash,
1022                    int              pieceIndex,
1023                    int              success )
1024{
1025    if( !success )
1026    {
1027        int peerCount, i;
1028        Torrent * t = getExistingTorrent( manager, torrentHash );
1029        tr_peer ** peers;
1030
1031        assert( torrentIsLocked( t ) );
1032
1033        peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1034        for( i=0; i<peerCount; ++i )
1035        {
1036            struct peer_atom * atom;
1037            tr_peer * peer;
1038
1039            peer = peers[i];
1040            if( !tr_bitfieldHas( peer->blame, pieceIndex ) )
1041                continue;
1042
1043            ++peer->strikes;
1044            tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1045                       tr_peerIoAddrStr(&peer->in_addr,peer->port),
1046                       pieceIndex, (int)peer->strikes );
1047            if( peer->strikes < MAX_BAD_PIECES_PER_PEER )
1048                continue;
1049
1050            atom = getExistingAtom( t, &peer->in_addr );
1051            atom->myflags |= MYFLAG_BANNED;
1052            peer->doPurge = 1;
1053            tordbg( t, "banning peer %s due to corrupt data", tr_peerIoAddrStr(&atom->addr,atom->port) );
1054        }
1055    }
1056}
1057
1058int
1059tr_pexCompare( const void * va, const void * vb )
1060{
1061    const tr_pex * a = (const tr_pex *) va;
1062    const tr_pex * b = (const tr_pex *) vb;
1063    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1064    if( i ) return i;
1065    if( a->port < b->port ) return -1;
1066    if( a->port > b->port ) return 1;
1067    return 0;
1068}
1069
1070int tr_pexCompare( const void * a, const void * b );
1071
1072static int
1073peerPrefersCrypto( const tr_peer * peer )
1074{
1075    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1076        return TRUE;
1077
1078    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1079        return FALSE;
1080
1081    return tr_peerIoIsEncrypted( peer->io );
1082};
1083
1084int
1085tr_peerMgrGetPeers( tr_peerMgr      * manager,
1086                    const uint8_t   * torrentHash,
1087                    tr_pex         ** setme_pex )
1088{
1089    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1090    int i, peerCount;
1091    const tr_peer ** peers;
1092    tr_pex * pex;
1093    tr_pex * walk;
1094
1095    torrentLock( (Torrent*)t );
1096
1097    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1098    pex = walk = tr_new( tr_pex, peerCount );
1099
1100    for( i=0; i<peerCount; ++i, ++walk )
1101    {
1102        const tr_peer * peer = peers[i];
1103
1104        walk->in_addr = peer->in_addr;
1105
1106        walk->port = peer->port;
1107
1108        walk->flags = 0;
1109        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1110        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1111    }
1112
1113    assert( ( walk - pex ) == peerCount );
1114    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1115    *setme_pex = pex;
1116
1117    torrentUnlock( (Torrent*)t );
1118
1119    return peerCount;
1120}
1121
1122static int reconnectPulse( void * vtorrent );
1123static int rechokePulse( void * vtorrent );
1124
1125void
1126tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1127                        const uint8_t  * torrentHash )
1128{
1129    Torrent * t;
1130
1131    managerLock( manager );
1132
1133    t = getExistingTorrent( manager, torrentHash );
1134
1135    assert( t != NULL );
1136    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1137    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1138
1139    if( !t->isRunning )
1140    {
1141        t->isRunning = 1;
1142
1143        t->reconnectTimer = tr_timerNew( t->manager->handle,
1144                                         reconnectPulse, t,
1145                                         RECONNECT_PERIOD_MSEC );
1146
1147        t->rechokeTimer = tr_timerNew( t->manager->handle,
1148                                       rechokePulse, t,
1149                                       RECHOKE_PERIOD_MSEC );
1150
1151        reconnectPulse( t );
1152
1153        rechokePulse( t );
1154    }
1155
1156    managerUnlock( manager );
1157}
1158
1159static void
1160stopTorrent( Torrent * t )
1161{
1162    assert( torrentIsLocked( t ) );
1163
1164    t->isRunning = 0;
1165    tr_timerFree( &t->rechokeTimer );
1166    tr_timerFree( &t->reconnectTimer );
1167
1168    /* disconnect the peers. */
1169    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1170    tr_ptrArrayClear( t->peers );
1171
1172    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1173     * which removes the handshake from t->outgoingHandshakes... */
1174    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1175        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1176}
1177void
1178tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1179                       const uint8_t  * torrentHash)
1180{
1181    managerLock( manager );
1182
1183    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1184
1185    managerUnlock( manager );
1186}
1187
1188void
1189tr_peerMgrAddTorrent( tr_peerMgr * manager,
1190                      tr_torrent * tor )
1191{
1192    Torrent * t;
1193
1194    managerLock( manager );
1195
1196    assert( tor != NULL );
1197    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1198
1199    t = torrentConstructor( manager, tor );
1200    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1201
1202    managerUnlock( manager );
1203}
1204
1205void
1206tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1207                         const uint8_t  * torrentHash )
1208{
1209    Torrent * t;
1210
1211    managerLock( manager );
1212
1213    t = getExistingTorrent( manager, torrentHash );
1214    assert( t != NULL );
1215    stopTorrent( t );
1216    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1217    torrentDestructor( t );
1218
1219    managerUnlock( manager );
1220}
1221
1222void
1223tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1224                               const uint8_t    * torrentHash,
1225                               int8_t           * tab,
1226                               int                tabCount )
1227{
1228    int i;
1229    const Torrent * t;
1230    const tr_torrent * tor;
1231    float interval;
1232
1233    managerLock( (tr_peerMgr*)manager );
1234
1235    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1236    tor = t->tor;
1237    interval = tor->info.pieceCount / (float)tabCount;
1238
1239    memset( tab, 0, tabCount );
1240
1241    for( i=0; i<tabCount; ++i )
1242    {
1243        const int piece = i * interval;
1244
1245        if( tor == NULL )
1246            tab[i] = 0;
1247        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1248            tab[i] = -1;
1249        else {
1250            int j, peerCount;
1251            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1252            for( j=0; j<peerCount; ++j )
1253                if( tr_bitfieldHas( peers[j]->have, i ) )
1254                    ++tab[i];
1255        }
1256    }
1257
1258    managerUnlock( (tr_peerMgr*)manager );
1259}
1260
1261/* Returns the pieces that we and/or a connected peer has */
1262tr_bitfield*
1263tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1264                        const uint8_t    * torrentHash )
1265{
1266    int i, size;
1267    const Torrent * t;
1268    const tr_peer ** peers;
1269    tr_bitfield * pieces;
1270
1271    managerLock( (tr_peerMgr*)manager );
1272
1273    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1274    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1275    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1276    for( i=0; i<size; ++i )
1277        if( peers[i]->io != NULL )
1278            tr_bitfieldAnd( pieces, peers[i]->have );
1279
1280    managerUnlock( (tr_peerMgr*)manager );
1281    return pieces;
1282}
1283
1284void
1285tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1286                        const uint8_t    * torrentHash,
1287                        int              * setmePeersKnown,
1288                        int              * setmePeersConnected,
1289                        int              * setmePeersSendingToUs,
1290                        int              * setmePeersGettingFromUs,
1291                        int              * setmePeersFrom )
1292{
1293    int i, size;
1294    const Torrent * t;
1295    const tr_peer ** peers;
1296
1297    managerLock( (tr_peerMgr*)manager );
1298
1299    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1300    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1301
1302    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1303    *setmePeersConnected      = 0;
1304    *setmePeersSendingToUs    = 0;
1305    *setmePeersGettingFromUs  = 0;
1306
1307    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1308        setmePeersFrom[i] = 0;
1309
1310    for( i=0; i<size; ++i )
1311    {
1312        const tr_peer * peer = peers[i];
1313        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1314
1315        if( peer->io == NULL ) /* not connected */
1316            continue;
1317
1318        ++*setmePeersConnected;
1319
1320        ++setmePeersFrom[atom->from];
1321
1322        if( peer->rateToPeer > 0.01 )
1323            ++*setmePeersGettingFromUs;
1324
1325        if( peer->rateToClient > 0.01 )
1326            ++*setmePeersSendingToUs;
1327    }
1328
1329    managerUnlock( (tr_peerMgr*)manager );
1330}
1331
1332struct tr_peer_stat *
1333tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1334                     const uint8_t     * torrentHash,
1335                     int               * setmeCount UNUSED )
1336{
1337    int i, size;
1338    const Torrent * t;
1339    const tr_peer ** peers;
1340    tr_peer_stat * ret;
1341
1342    assert( manager != NULL );
1343    managerLock( (tr_peerMgr*)manager );
1344
1345    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1346    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1347
1348    ret = tr_new0( tr_peer_stat, size );
1349
1350    for( i=0; i<size; ++i )
1351    {
1352        const tr_peer * peer = peers[i];
1353        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1354        tr_peer_stat * stat = ret + i;
1355
1356        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1357        stat->port             = peer->port;
1358        stat->from             = atom->from;
1359        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1360        stat->progress         = peer->progress;
1361        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1362        stat->uploadToRate     = peer->rateToPeer;
1363        stat->downloadFromRate = peer->rateToClient;
1364        stat->isDownloading    = stat->uploadToRate > 0.01;
1365        stat->isUploading      = stat->downloadFromRate > 0.01;
1366    }
1367
1368    *setmeCount = size;
1369
1370    managerUnlock( (tr_peerMgr*)manager );
1371    return ret;
1372}
1373
1374/**
1375***
1376**/
1377
1378struct ChokeData
1379{
1380    tr_peer * peer;
1381    float rate;
1382    int randomKey;
1383    int preferred;
1384    int doUnchoke;
1385};
1386
1387static int
1388compareChoke( const void * va, const void * vb )
1389{
1390    const struct ChokeData * a = va;
1391    const struct ChokeData * b = vb;
1392
1393    if( a->preferred != b->preferred )
1394        return a->preferred ? -1 : 1;
1395
1396    if( a->preferred )
1397    {
1398        if( a->rate > b->rate ) return -1;
1399        if( a->rate < b->rate ) return 1;
1400        return 0;
1401    }
1402    else
1403    {
1404        return a->randomKey - b->randomKey;
1405    }
1406}
1407
1408static int
1409clientIsSnubbedBy( const tr_peer * peer )
1410{
1411    assert( peer != NULL );
1412
1413    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1414}
1415
1416/**
1417***
1418**/
1419
1420static double
1421getWeightedThroughput( const tr_peer * peer )
1422{
1423    return ( 3 * peer->rateToPeer )
1424         + ( 1 * peer->rateToClient );
1425}
1426
1427static void
1428rechoke( Torrent * t )
1429{
1430    int i, peerCount, size=0, unchoked=0;
1431    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1432    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1433    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1434
1435    assert( torrentIsLocked( t ) );
1436   
1437    /* sort the peers by preference and rate */
1438    for( i=0; i<peerCount; ++i )
1439    {
1440        tr_peer * peer = peers[i];
1441        struct ChokeData * node;
1442        if( peer->chokeChangedAt > ignorePeersNewerThan )
1443            continue;
1444
1445        node = &choke[size++];
1446        node->peer = peer;
1447        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1448        node->randomKey = tr_rand( INT_MAX );
1449        node->rate = getWeightedThroughput( peer );
1450    }
1451
1452    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1453
1454    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1455        choke[i].doUnchoke = 1;
1456        ++unchoked;
1457    }
1458
1459    for( ; i<size; ++i ) {
1460        choke[i].doUnchoke = 1;
1461        ++unchoked;
1462        if( choke[i].peer->peerIsInterested )
1463            break;
1464    }
1465
1466    for( i=0; i<size; ++i )
1467        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1468
1469    /* cleanup */
1470    tr_free( choke );
1471    tr_free( peers );
1472}
1473
1474static int
1475rechokePulse( void * vtorrent )
1476{
1477    Torrent * t = vtorrent;
1478    torrentLock( t );
1479    rechoke( t );
1480    torrentUnlock( t );
1481    return TRUE;
1482}
1483
1484/***
1485****
1486****
1487****
1488***/
1489
1490#define LAISSEZ_FAIRE_PERIOD_SECS 90
1491
1492static tr_peer **
1493getWeakConnections( Torrent * t, int * setmeSize )
1494{
1495    int i, insize, outsize;
1496    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1497    struct tr_peer ** ret = tr_new( tr_peer*, insize );
1498    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1499    const time_t now = time( NULL );
1500
1501    assert( torrentIsLocked( t ) );
1502
1503    for( i=outsize=0; i<insize; ++i )
1504    {
1505        tr_peer * peer = peers[i];
1506        int isWeak;
1507        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1508        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1509        const double throughput = getWeightedThroughput( peer );
1510
1511        assert( atom != NULL );
1512
1513        if( peer->doPurge )
1514            isWeak = TRUE;
1515        else if( throughput >= 3 )
1516            isWeak = FALSE;
1517        else if( peerIsSeed && clientIsSeed )
1518            isWeak = t->tor->pexDisabled || (now-atom->time>=30);
1519        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1520            isWeak = FALSE;
1521        else
1522            isWeak = ( now - peer->pieceDataActivityDate ) > 180;
1523
1524        if( isWeak )
1525            ret[outsize++] = peer;
1526    }
1527
1528    *setmeSize = outsize;
1529    return ret;
1530}
1531
1532static int
1533compareAtomByTime( const void * va, const void * vb )
1534{
1535    const struct peer_atom * a = * (const struct peer_atom**) va;
1536    const struct peer_atom * b = * (const struct peer_atom**) vb;
1537    if( a->time < b->time ) return -1;
1538    if( a->time > b->time ) return 1;
1539    return 0;
1540}
1541
1542static struct peer_atom **
1543getPeerCandidates( Torrent * t, int * setmeSize )
1544{
1545    int i, insize, outsize;
1546    struct peer_atom ** atoms;
1547    struct peer_atom ** ret;
1548    const time_t now = time( NULL );
1549    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1550
1551    assert( torrentIsLocked( t ) );
1552
1553    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1554    ret = tr_new( struct peer_atom*, insize );
1555    for( i=outsize=0; i<insize; ++i )
1556    {
1557        struct peer_atom * atom = atoms[i];
1558
1559        /* peer fed us too much bad data ... we only keep it around
1560         * now to weed it out in case someone sends it to us via pex */
1561        if( atom->myflags & MYFLAG_BANNED ) {
1562            tordbg( t, "RECONNECT peer %d (%s) is banned...",
1563                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1564            continue;
1565        }
1566
1567        /* we don't need two connections to the same peer... */
1568        if( peerIsInUse( t, &atom->addr ) ) {
1569            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1570                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1571            continue;
1572        }
1573
1574        /* no need to connect if we're both seeds... */
1575        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1576            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1577                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1578            continue;
1579        }
1580
1581        /* if we used this peer recently, give someone else a turn */
1582        if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS ) {
1583            tordbg( t, "RECONNECT peer %d (%s) is in its grace period..",
1584                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1585            continue;
1586        }
1587
1588        ret[outsize++] = atom;
1589    }
1590
1591    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1592    *setmeSize = outsize;
1593    return ret;
1594}
1595
1596static int
1597reconnectPulse( void * vtorrent )
1598{
1599    Torrent * t = vtorrent;
1600
1601    torrentLock( t );
1602
1603    if( !t->isRunning )
1604    {
1605        removeAllPeers( t );
1606    }
1607    else
1608    {
1609        int i, nCandidates, nWeak, nAdd;
1610        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1611        struct tr_peer ** connections = getWeakConnections( t, &nWeak );
1612        const int peerCount = tr_ptrArraySize( t->peers );
1613
1614        tordbg( t, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d",
1615                 t->tor->info.name, nWeak, nCandidates, tr_ptrArraySize(t->pool), (int)MAX_RECONNECTIONS_PER_PULSE );
1616
1617        /* disconnect some peers */
1618        for( i=0; i<nWeak; ++i )
1619            removePeer( t, connections[i] );
1620
1621        /* add some new ones */
1622        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1623        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1624        //for( i=0; i<nCandidates; ++i )
1625        {
1626            tr_peerMgr * mgr = t->manager;
1627
1628            struct peer_atom * atom = candidates[i];
1629
1630            tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1631
1632            tr_handshake * handshake = tr_handshakeNew( io,
1633                                                        mgr->handle->encryptionMode,
1634                                                        myHandshakeDoneCB,
1635                                                        mgr );
1636            ++mgr->connectionCount;
1637
1638            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1639
1640            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1641
1642            atom->time = time( NULL );
1643        }
1644
1645        /* cleanup */
1646        tr_free( connections );
1647        tr_free( candidates );
1648    }
1649
1650    torrentUnlock( t );
1651    return TRUE;
1652}
Note: See TracBrowser for help on using the repository browser.