source: trunk/libtransmission/peer-mgr.c @ 3273

Last change on this file since 3273 was 3273, checked in by charles, 15 years ago

make the peer manager a little greedier w.r.t. keeping connections alive.

  • Property svn:keywords set to Date Rev Author Id
File size: 44.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3273 2007-10-02 19:25:18Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <sys/types.h> /* event.h needs this */
20#include <event.h>
21
22#include "transmission.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "platform.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "shared.h"
36#include "trevent.h"
37#include "utils.h"
38
39enum
40{
41    /* how frequently to change which peers are choked */
42    RECHOKE_PERIOD_MSEC = (15 * 1000),
43
44    /* how frequently to decide which peers live and die */
45    RECONNECT_PERIOD_MSEC = (5 * 1000),
46
47    /* how frequently to refill peers' request lists */
48    REFILL_PERIOD_MSEC = 1000,
49
50    /* how many peers to unchoke per-torrent. */
51    /* FIXME: make this user-configurable? */
52    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
53
54    /* don't change a peer's choke status more often than this */
55    MIN_CHOKE_PERIOD_SEC = 10,
56
57    /* following the BT spec, we consider ourselves `snubbed' if
58     * we're we don't get piece data from a peer in this long */
59    SNUBBED_SEC = 60,
60
61    /* this is arbitrary and, hopefully, temporary until we come up
62     * with a better idea for managing the connection limits */
63    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
64
65    /* another arbitrary number */
66    MAX_RECONNECTIONS_PER_MINUTE = 60,
67
68    MAX_RECONNECTIONS_PER_PULSE =
69        ((MAX_RECONNECTIONS_PER_MINUTE * RECONNECT_PERIOD_MSEC) / (60*1000)),
70
71    /* corresponds to ut_pex's added.f flags */
72    ADDED_F_ENCRYPTION_FLAG = 1,
73
74    /* corresponds to ut_pex's added.f flags */
75    ADDED_F_SEED_FLAG = 2
76};
77
78
79/**
80***
81**/
82
83/* We keep one of these for every peer we know about, whether
84 * it's connected or not, so the struct must be small.
85 * When our current connections underperform, we dip back
86 * into this list for new ones. */
87struct peer_atom
88{   
89    uint8_t from;
90    uint8_t flags; /* these match the added_f flags */
91    uint16_t port;
92    struct in_addr addr; 
93    time_t time;
94};
95
96typedef struct
97{
98    uint8_t hash[SHA_DIGEST_LENGTH];
99    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
100    tr_ptrArray * pool; /* struct peer_atom */
101    tr_ptrArray * peers; /* tr_peer */
102    tr_timer * reconnectTimer;
103    tr_timer * rechokeTimer;
104    tr_timer * refillTimer;
105    tr_torrent * tor;
106    tr_bitfield * requested;
107
108    unsigned int isRunning : 1;
109
110    struct tr_peerMgr * manager;
111}
112Torrent;
113
114struct tr_peerMgr
115{
116    int connectionCount;
117    tr_handle * handle;
118    tr_ptrArray * torrents; /* Torrent */
119    tr_ptrArray * incomingHandshakes; /* tr_handshake */
120};
121
122/**
123***
124**/
125
126static void
127myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
128{
129    FILE * fp = tr_getLog( );
130    if( fp != NULL )
131    {
132        va_list args;
133        struct evbuffer * buf = evbuffer_new( );
134        evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name  );
135        va_start( args, fmt );
136        evbuffer_add_vprintf( buf, fmt, args );
137        va_end( args );
138        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
139        evbuffer_free( buf );
140    }
141}
142
143#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
144
145/**
146***
147**/
148
149static void
150managerLock( struct tr_peerMgr * manager )
151{
152    tr_globalLock( manager->handle );
153}
154static void
155managerUnlock( struct tr_peerMgr * manager )
156{
157    tr_globalUnlock( manager->handle );
158}
159static void
160torrentLock( Torrent * torrent )
161{
162    managerLock( torrent->manager );
163}
164static void
165torrentUnlock( Torrent * torrent )
166{
167    managerUnlock( torrent->manager );
168}
169static int
170torrentIsLocked( const Torrent * t )
171{
172    return tr_globalIsLocked( t->manager->handle );
173}
174
175/**
176***
177**/
178
179static int
180compareAddresses( const struct in_addr * a, const struct in_addr * b )
181{
182    return tr_compareUint32( a->s_addr, b->s_addr );
183}
184
185static int
186handshakeCompareToAddr( const void * va, const void * vb )
187{
188    const tr_handshake * a = va;
189    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
190}
191
192static int
193handshakeCompare( const void * a, const void * b )
194{
195    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
196}
197
198static tr_handshake*
199getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
200{
201    return tr_ptrArrayFindSorted( handshakes,
202                                  in_addr,
203                                  handshakeCompareToAddr );
204}
205
206static int
207comparePeerAtomToAddress( const void * va, const void * vb )
208{
209    const struct peer_atom * a = va;
210    return compareAddresses( &a->addr, vb );
211}
212
213static int
214comparePeerAtoms( const void * va, const void * vb )
215{
216    const struct peer_atom * b = vb;
217    return comparePeerAtomToAddress( va, &b->addr );
218}
219
220/**
221***
222**/
223
224static int
225torrentCompare( const void * va, const void * vb )
226{
227    const Torrent * a = va;
228    const Torrent * b = vb;
229    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
230}
231
232static int
233torrentCompareToHash( const void * va, const void * vb )
234{
235    const Torrent * a = (const Torrent*) va;
236    const uint8_t * b_hash = (const uint8_t*) vb;
237    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
238}
239
240static Torrent*
241getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
242{
243    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
244                                             hash,
245                                             torrentCompareToHash );
246}
247
248static int
249peerCompare( const void * va, const void * vb )
250{
251    const tr_peer * a = (const tr_peer *) va;
252    const tr_peer * b = (const tr_peer *) vb;
253    return compareAddresses( &a->in_addr, &b->in_addr );
254}
255
256static int
257peerCompareToAddr( const void * va, const void * vb )
258{
259    const tr_peer * a = (const tr_peer *) va;
260    return compareAddresses( &a->in_addr, vb );
261}
262
263static tr_peer*
264getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
265{
266    assert( torrentIsLocked( torrent ) );
267    assert( in_addr != NULL );
268
269    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
270                                             in_addr,
271                                             peerCompareToAddr );
272}
273
274static struct peer_atom*
275getExistingAtom( const Torrent * t, const struct in_addr * addr )
276{
277    assert( torrentIsLocked( t ) );
278    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
279}
280
281static int
282peerIsKnown( const Torrent * t, const struct in_addr * addr )
283{
284    return getExistingAtom( t, addr ) != NULL;
285}
286
287static int
288peerIsInUse( const Torrent * ct, const struct in_addr * addr )
289{
290    Torrent * t = (Torrent*) ct;
291
292    assert( torrentIsLocked ( t ) );
293
294    return ( getExistingPeer( t, addr ) != NULL )
295        || ( getExistingHandshake( t->outgoingHandshakes, addr ) != NULL )
296        || ( getExistingHandshake( t->manager->incomingHandshakes, addr ) != NULL );
297}
298
299static tr_peer*
300getPeer( Torrent * torrent, const struct in_addr * in_addr )
301{
302    tr_peer * peer;
303
304    assert( torrentIsLocked( torrent ) );
305
306    peer = getExistingPeer( torrent, in_addr );
307
308    if( peer == NULL )
309    {
310        peer = tr_new0( tr_peer, 1 );
311        peer->rateToClient = tr_rcInit( );
312        peer->rateToPeer = tr_rcInit( );
313        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
314        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
315    }
316
317    return peer;
318}
319
320static void
321disconnectPeer( tr_peer * peer )
322{
323    assert( peer != NULL );
324
325    tr_peerIoFree( peer->io );
326    peer->io = NULL;
327
328    if( peer->msgs != NULL )
329    {
330        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
331        tr_peerMsgsFree( peer->msgs );
332        peer->msgs = NULL;
333    }
334
335    tr_bitfieldFree( peer->have );
336    peer->have = NULL;
337
338    tr_bitfieldFree( peer->blame );
339    peer->blame = NULL;
340
341    tr_bitfieldFree( peer->banned );
342    peer->banned = NULL;
343}
344
345static void
346freePeer( tr_peer * peer )
347{
348    disconnectPeer( peer );
349    tr_rcClose( peer->rateToClient );
350    tr_rcClose( peer->rateToPeer );
351    tr_free( peer->client );
352    tr_free( peer );
353}
354
355static void
356removePeer( Torrent * t, tr_peer * peer )
357{
358    tr_peer * removed;
359    struct peer_atom * atom;
360
361    assert( torrentIsLocked( t ) );
362
363    atom = getExistingAtom( t, &peer->in_addr );
364    assert( atom != NULL );
365    atom->time = time( NULL );
366
367    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
368    assert( removed == peer );
369    freePeer( removed );
370}
371
372static void
373removeAllPeers( Torrent * t )
374{
375    while( !tr_ptrArrayEmpty( t->peers ) )
376        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
377}
378
379/* torrent must have already been removed from manager->torrents */
380static void
381torrentDestructor( Torrent * t )
382{
383    uint8_t hash[SHA_DIGEST_LENGTH];
384
385    assert( t != NULL );
386    assert( !t->isRunning );
387    assert( t->peers != NULL );
388    assert( torrentIsLocked( t ) );
389    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
390    assert( tr_ptrArrayEmpty( t->peers ) );
391
392    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
393
394    tr_timerFree( &t->reconnectTimer );
395    tr_timerFree( &t->rechokeTimer );
396    tr_timerFree( &t->refillTimer );
397
398    tr_bitfieldFree( t->requested );
399    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
400    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
401
402    tr_free( t );
403}
404
405static int reconnectPulse( void * vtorrent );
406
407static void
408restartReconnectTimer( Torrent * t )
409{
410    tr_timerFree( &t->reconnectTimer );
411    if( t->isRunning )
412        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
413}
414
415static int rechokePulse( void * vtorrent );
416
417static void
418restartChokeTimer( Torrent * t )
419{
420    tr_timerFree( &t->rechokeTimer );
421    if( t->isRunning )
422        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
423}
424
425static Torrent*
426torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
427{
428    Torrent * t;
429
430    t = tr_new0( Torrent, 1 );
431    t->manager = manager;
432    t->tor = tor;
433    t->pool = tr_ptrArrayNew( );
434    t->peers = tr_ptrArrayNew( );
435    t->outgoingHandshakes = tr_ptrArrayNew( );
436    t->requested = tr_bitfieldNew( tor->blockCount );
437    restartChokeTimer( t );
438    restartReconnectTimer( t );
439    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
440
441    return t;
442}
443
444/**
445***
446**/
447
448struct tr_bitfield *
449tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
450                              const uint32_t         pieceCount,
451                              const uint8_t          infohash[20],
452                              const struct in_addr * ip )
453{
454    /* This has been checked against the spec example implementation. Feeding it with :
455    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
456generate :
457    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
458    but since we're storing in a bitfield, it won't be in this order... */
459    /* TODO : We should translate link-local IPv4 adresses to external IP,
460     * so that being on same local network gives us the same allowed pieces */
461   
462    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
463    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
464            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
465            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
466            inet_ntoa( *ip ) );
467   
468    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
469    char buf[4];
470    uint32_t allowedPieceCount = 0;
471    tr_bitfield_t * ret;
472   
473    ret = tr_bitfieldNew( pieceCount );
474   
475    /* We need a seed based on most significant bytes of peer address
476        concatenated with torrent's infohash */
477    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
478   
479    memcpy( seed, &buf, 4 );
480    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
481   
482    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
483   
484    while ( allowedPieceCount < setCount )
485    {
486        int i;
487        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
488        {
489            /* We generate indices from 4-byte chunks of the seed */
490            uint32_t j = i * 4;
491            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
492            uint32_t index = y % pieceCount;
493           
494            if ( !tr_bitfieldHas( ret, index ) )
495            {
496                tr_bitfieldAdd( ret, index );
497                allowedPieceCount++;
498            }
499        }
500        /* We randomize the seed, in case we need to iterate more */
501        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
502    }
503    tr_free( seed );
504   
505    return ret;
506}
507
508tr_peerMgr*
509tr_peerMgrNew( tr_handle * handle )
510{
511    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
512    m->handle = handle;
513    m->torrents = tr_ptrArrayNew( );
514    m->incomingHandshakes = tr_ptrArrayNew( );
515    return m;
516}
517
518void
519tr_peerMgrFree( tr_peerMgr * manager )
520{
521    managerLock( manager );
522
523    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
524     * the item from manager->handshakes, so this is a little roundabout... */
525    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
526        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
527    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
528
529    /* free the torrents. */
530    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
531
532    managerUnlock( manager );
533    tr_free( manager );
534}
535
536static tr_peer**
537getConnectedPeers( Torrent * t, int * setmeCount )
538{
539    int i, peerCount, connectionCount;
540    tr_peer **peers;
541    tr_peer **ret;
542
543    assert( torrentIsLocked( t ) );
544
545    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
546    ret = tr_new( tr_peer*, peerCount );
547
548    for( i=connectionCount=0; i<peerCount; ++i )
549        if( peers[i]->msgs != NULL )
550            ret[connectionCount++] = peers[i];
551
552    *setmeCount = connectionCount;
553    return ret;
554}
555
556/***
557****  Refill
558***/
559
560struct tr_refill_piece
561{
562    tr_priority_t priority;
563    uint16_t random;
564    uint32_t piece;
565    uint32_t peerCount;
566    uint32_t fastAllowed;
567};
568
569static int
570compareRefillPiece (const void * aIn, const void * bIn)
571{
572    const struct tr_refill_piece * a = aIn;
573    const struct tr_refill_piece * b = bIn;
574
575    /* if one piece has a higher priority, it goes first */
576    if (a->priority != b->priority)
577        return a->priority > b->priority ? -1 : 1;
578
579    /* otherwise if one has fewer peers, it goes first */
580    if (a->peerCount != b->peerCount)
581        return a->peerCount < b->peerCount ? -1 : 1;
582   
583    /* otherwise if one *might be* fastallowed to us */
584    if (a->fastAllowed != b->fastAllowed)
585        return a->fastAllowed < b->fastAllowed ? -1 : 1;
586
587    /* otherwise go with our random seed */
588    return tr_compareUint16( a->random, b->random );
589}
590
591static int
592isPieceInteresting( const tr_torrent  * tor,
593                    int                 piece )
594{
595    if( tor->info.pieces[piece].dnd ) /* we don't want it */
596        return 0;
597
598    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
599        return 0;
600
601    return 1;
602}
603
604static uint32_t*
605getPreferredPieces( Torrent     * t,
606                    uint32_t    * pieceCount )
607{
608    const tr_torrent * tor = t->tor;
609    const tr_info * inf = &tor->info;
610    int i;
611    uint32_t poolSize = 0;
612    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
613    int peerCount;
614    tr_peer** peers;
615
616    assert( torrentIsLocked( t ) );
617
618    peers = getConnectedPeers( t, &peerCount );
619
620    for( i=0; i<inf->pieceCount; ++i )
621        if( isPieceInteresting( tor, i ) )
622            pool[poolSize++] = i;
623
624    /* sort the pool from most interesting to least... */
625    if( poolSize > 1 )
626    {
627        uint32_t j;
628        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
629
630        for( j=0; j<poolSize; ++j )
631        {
632            int k;
633            const int piece = pool[j];
634            struct tr_refill_piece * setme = p + j;
635
636            setme->piece = piece;
637            setme->priority = inf->pieces[piece].priority;
638            setme->peerCount = 0;
639            setme->fastAllowed = 0;
640            setme->random = tr_rand( UINT16_MAX );
641            /* FIXME */
642//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
643
644            for( k=0; k<peerCount; ++k ) {
645                const tr_peer * peer = peers[k];
646                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
647                    ++setme->peerCount;
648            }
649        }
650
651        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
652
653        for( j=0; j<poolSize; ++j )
654            pool[j] = p[j].piece;
655
656        tr_free( p );
657    }
658
659    tr_free( peers );
660
661    *pieceCount = poolSize;
662    return pool;
663}
664
665static uint64_t*
666getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
667{
668    uint32_t i;
669    uint32_t pieceCount;
670    uint32_t * pieces;
671    uint64_t *req, *unreq, *ret, *walk;
672    int reqCount, unreqCount;
673    const tr_torrent * tor = t->tor;
674
675    assert( torrentIsLocked( t ) );
676
677    pieces = getPreferredPieces( t, &pieceCount );
678
679    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
680    reqCount = 0;
681    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
682    unreqCount = 0;
683
684    for( i=0; i<pieceCount; ++i ) {
685        const uint32_t index = pieces[i];
686        const int begin = tr_torPieceFirstBlock( tor, index );
687        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
688        int block;
689        for( block=begin; block<end; ++block )
690            if( tr_cpBlockIsComplete( tor->completion, block ) )
691                continue;
692            else if( tr_bitfieldHas( t->requested, block ) )
693                req[reqCount++] = block;
694            else
695                unreq[unreqCount++] = block;
696    }
697
698    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
699    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
700    walk += unreqCount;
701    memcpy( walk, req, sizeof(uint64_t) * reqCount );
702    walk += reqCount;
703    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
704    *setmeCount = walk - ret;
705
706    tr_free( req );
707    tr_free( unreq );
708    tr_free( pieces );
709
710    return ret;
711}
712
713static int
714refillPulse( void * vtorrent )
715{
716    Torrent * t = vtorrent;
717    tr_torrent * tor = t->tor;
718    uint32_t i;
719    int peerCount;
720    tr_peer ** peers;
721    uint64_t blockCount;
722    uint64_t * blocks;
723
724    if( !t->isRunning )
725        return TRUE;
726    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
727        return TRUE;
728
729    torrentLock( t );
730
731    blocks = getPreferredBlocks( t, &blockCount );
732    peers = getConnectedPeers( t, &peerCount );
733
734    for( i=0; peerCount && i<blockCount; ++i )
735    {
736        const int block = blocks[i];
737        const uint32_t index = tr_torBlockPiece( tor, block );
738        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
739        const uint32_t length = tr_torBlockCountBytes( tor, block );
740        int j;
741        assert( _tr_block( tor, index, begin ) == block );
742        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
743        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
744
745
746        /* find a peer who can ask for this block */
747        for( j=0; j<peerCount; )
748        {
749            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
750            switch( val )
751            {
752                case TR_ADDREQ_FULL: 
753                case TR_ADDREQ_CLIENT_CHOKED:
754                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
755                    break;
756
757                case TR_ADDREQ_MISSING: 
758                    ++j;
759                    break;
760
761                case TR_ADDREQ_OK:
762                    tr_bitfieldAdd( t->requested, block );
763                    j = peerCount;
764                    break;
765
766                default:
767                    assert( 0 && "unhandled value" );
768                    break;
769            }
770        }
771    }
772
773    /* cleanup */
774    tr_free( peers );
775    tr_free( blocks );
776
777    t->refillTimer = NULL;
778
779    torrentUnlock( t );
780    return FALSE;
781}
782
783static void
784broadcastClientHave( Torrent * t, uint32_t index )
785{
786    int i, size;
787    tr_peer ** peers;
788
789    assert( torrentIsLocked( t ) );
790
791    peers = getConnectedPeers( t, &size );
792    for( i=0; i<size; ++i )
793        tr_peerMsgsHave( peers[i]->msgs, index );
794    tr_free( peers );
795}
796
797static void
798broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
799{
800    int i, size;
801    tr_peer ** peers;
802
803    assert( torrentIsLocked( t ) );
804
805    peers = getConnectedPeers( t, &size );
806    for( i=0; i<size; ++i )
807        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
808    tr_free( peers );
809}
810
811static void
812msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
813{
814    tr_peer * peer = vpeer;
815    Torrent * t = (Torrent *) vt;
816    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
817
818    torrentLock( t );
819
820    switch( e->eventType )
821    {
822        case TR_PEERMSG_NEED_REQ:
823            if( t->refillTimer == NULL )
824                t->refillTimer = tr_timerNew( t->manager->handle,
825                                              refillPulse, t,
826                                              REFILL_PERIOD_MSEC );
827            break;
828
829        case TR_PEERMSG_CLIENT_HAVE:
830            broadcastClientHave( t, e->pieceIndex );
831            tr_torrentRecheckCompleteness( t->tor );
832            break;
833
834        case TR_PEERMSG_PEER_PROGRESS: {
835            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
836            const int peerIsSeed = e->progress >= 1.0;
837            if( peerIsSeed )
838                atom->flags |= ADDED_F_SEED_FLAG;
839            else
840                atom->flags &= ~ADDED_F_SEED_FLAG;
841            break;
842        }
843
844        case TR_PEERMSG_CLIENT_BLOCK:
845            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
846            break;
847
848        case TR_PEERMSG_GOT_ERROR:
849            peer->doPurge = 1;
850            break;
851
852        default:
853            assert(0);
854    }
855
856    torrentUnlock( t );
857}
858
859static void
860ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
861{
862    if( !peerIsKnown( t, addr ) )
863    {
864        struct peer_atom * a = tr_new( struct peer_atom, 1 );
865        a->addr = *addr;
866        a->port = port;
867        a->flags = flags;
868        a->from = from;
869        a->time = 0;
870        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
871        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
872    }
873}
874
875/* FIXME: this is kind of a mess. */
876static void
877myHandshakeDoneCB( tr_handshake    * handshake,
878                   tr_peerIo       * io,
879                   int               isConnected,
880                   const uint8_t   * peer_id,
881                   void            * vmanager )
882{
883    int ok = isConnected;
884    uint16_t port;
885    const struct in_addr * in_addr;
886    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
887    Torrent * t;
888    tr_handshake * ours;
889
890    assert( io != NULL );
891    assert( isConnected==0 || isConnected==1 );
892
893    t = tr_peerIoHasTorrentHash( io )
894        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
895        : NULL;
896
897    if( tr_peerIoIsIncoming ( io ) )
898        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
899                                        handshake, handshakeCompare );
900    else if( t != NULL )
901        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
902                                        handshake, handshakeCompare );
903    else
904        ours = handshake;
905
906    assert( ours != NULL );
907    assert( ours == handshake );
908
909    if( t != NULL )
910        torrentLock( t );
911
912    in_addr = tr_peerIoGetAddress( io, &port );
913
914    if( !t || !t->isRunning )
915    {
916        tr_peerIoFree( io );
917        --manager->connectionCount;
918    }
919    else if( !ok )
920    {
921        /* if we couldn't connect or were snubbed,
922         * the peer's probably not worth remembering. */
923        tr_peer * peer = getExistingPeer( t, in_addr );
924        tr_peerIoFree( io );
925        --manager->connectionCount;
926        if( peer )
927            peer->doPurge = 1;
928    }
929    else /* looking good */
930    {
931        tr_peer * peer = getPeer( t, in_addr );
932        uint16_t port;
933        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
934        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
935        if( peer->msgs != NULL ) { /* we already have this peer */
936            tr_peerIoFree( io );
937            --manager->connectionCount;
938        } else {
939            peer->port = port;
940            peer->io = io;
941            peer->msgs = tr_peerMsgsNew( t->tor, peer );
942            tr_free( peer->client );
943            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
944            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
945        }
946    }
947
948    if( t != NULL )
949        torrentUnlock( t );
950}
951
952void
953tr_peerMgrAddIncoming( tr_peerMgr      * manager,
954                       struct in_addr  * addr,
955                       uint16_t          port,
956                       int               socket )
957{
958    managerLock( manager );
959
960    if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL )
961    {
962        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
963
964        tr_handshake * handshake = tr_handshakeNew( io,
965                                                    manager->handle->encryptionMode,
966                                                    myHandshakeDoneCB,
967                                                    manager );
968        ++manager->connectionCount;
969
970        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
971    }
972
973    managerUnlock( manager );
974}
975
976void
977tr_peerMgrAddPex( tr_peerMgr     * manager,
978                  const uint8_t  * torrentHash,
979                  int              from,
980                  const tr_pex   * pex,
981                  int              pexCount )
982{
983    Torrent * t;
984    const tr_pex * end;
985
986    managerLock( manager );
987
988    t = getExistingTorrent( manager, torrentHash );
989    for( end=pex+pexCount; pex!=end; ++pex )
990        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
991
992    managerUnlock( manager );
993}
994
995void
996tr_peerMgrAddPeers( tr_peerMgr    * manager,
997                    const uint8_t * torrentHash,
998                    int             from,
999                    const uint8_t * peerCompact,
1000                    int             peerCount )
1001{
1002    int i;
1003    const uint8_t * walk = peerCompact;
1004    Torrent * t;
1005
1006    managerLock( manager );
1007
1008    t = getExistingTorrent( manager, torrentHash );
1009    for( i=0; t!=NULL && i<peerCount; ++i )
1010    {
1011        struct in_addr addr;
1012        uint16_t port;
1013        memcpy( &addr, walk, 4 ); walk += 4;
1014        memcpy( &port, walk, 2 ); walk += 2;
1015        ensureAtomExists( t, &addr, port, 0, from );
1016    }
1017
1018    managerUnlock( manager );
1019}
1020
1021/**
1022***
1023**/
1024
1025void
1026tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1027                    const uint8_t  * torrentHash UNUSED,
1028                    int              pieceIndex UNUSED,
1029                    int              success UNUSED )
1030{
1031    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1032}
1033
1034int
1035tr_pexCompare( const void * va, const void * vb )
1036{
1037    const tr_pex * a = (const tr_pex *) va;
1038    const tr_pex * b = (const tr_pex *) vb;
1039    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1040    if( i ) return i;
1041    if( a->port < b->port ) return -1;
1042    if( a->port > b->port ) return 1;
1043    return 0;
1044}
1045
1046int tr_pexCompare( const void * a, const void * b );
1047
1048static int
1049peerPrefersCrypto( const tr_peer * peer )
1050{
1051    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1052        return TRUE;
1053
1054    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1055        return FALSE;
1056
1057    return tr_peerIoIsEncrypted( peer->io );
1058};
1059
1060int
1061tr_peerMgrGetPeers( tr_peerMgr      * manager,
1062                    const uint8_t   * torrentHash,
1063                    tr_pex         ** setme_pex )
1064{
1065    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1066    int i, peerCount;
1067    const tr_peer ** peers;
1068    tr_pex * pex;
1069    tr_pex * walk;
1070
1071    torrentLock( (Torrent*)t );
1072
1073    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1074    pex = walk = tr_new( tr_pex, peerCount );
1075
1076    for( i=0; i<peerCount; ++i, ++walk )
1077    {
1078        const tr_peer * peer = peers[i];
1079
1080        walk->in_addr = peer->in_addr;
1081
1082        walk->port = peer->port;
1083
1084        walk->flags = 0;
1085        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1086        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1087    }
1088
1089    assert( ( walk - pex ) == peerCount );
1090    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1091    *setme_pex = pex;
1092
1093    torrentUnlock( (Torrent*)t );
1094
1095    return peerCount;
1096}
1097
1098void
1099tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1100                        const uint8_t  * torrentHash )
1101{
1102    Torrent * t;
1103
1104    managerLock( manager );
1105
1106    t = getExistingTorrent( manager, torrentHash );
1107    t->isRunning = 1;
1108    restartChokeTimer( t );
1109    restartReconnectTimer( t );
1110
1111    managerUnlock( manager );
1112}
1113
1114static void
1115stopTorrent( Torrent * t )
1116{
1117    assert( torrentIsLocked( t ) );
1118
1119    t->isRunning = 0;
1120    tr_timerFree( &t->rechokeTimer );
1121    tr_timerFree( &t->reconnectTimer );
1122
1123    /* disconnect the peers. */
1124    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)freePeer );
1125    tr_ptrArrayClear( t->peers );
1126
1127    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1128     * which removes the handshake from t->outgoingHandshakes... */
1129    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1130        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1131}
1132void
1133tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1134                       const uint8_t  * torrentHash)
1135{
1136    managerLock( manager );
1137
1138    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1139
1140    managerUnlock( manager );
1141}
1142
1143void
1144tr_peerMgrAddTorrent( tr_peerMgr * manager,
1145                      tr_torrent * tor )
1146{
1147    Torrent * t;
1148
1149    managerLock( manager );
1150
1151    assert( tor != NULL );
1152    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1153
1154    t = torrentConstructor( manager, tor );
1155    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1156
1157    managerUnlock( manager );
1158}
1159
1160void
1161tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1162                         const uint8_t  * torrentHash )
1163{
1164    Torrent * t;
1165
1166    managerLock( manager );
1167
1168    t = getExistingTorrent( manager, torrentHash );
1169    assert( t != NULL );
1170    stopTorrent( t );
1171    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1172    torrentDestructor( t );
1173
1174    managerUnlock( manager );
1175}
1176
1177void
1178tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1179                               const uint8_t    * torrentHash,
1180                               int8_t           * tab,
1181                               int                tabCount )
1182{
1183    int i;
1184    const Torrent * t;
1185    const tr_torrent * tor;
1186    float interval;
1187
1188    managerLock( (tr_peerMgr*)manager );
1189
1190    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1191    tor = t->tor;
1192    interval = tor->info.pieceCount / (float)tabCount;
1193
1194    memset( tab, 0, tabCount );
1195
1196    for( i=0; i<tabCount; ++i )
1197    {
1198        const int piece = i * interval;
1199
1200        if( tor == NULL )
1201            tab[i] = 0;
1202        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1203            tab[i] = -1;
1204        else {
1205            int j, peerCount;
1206            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1207            for( j=0; j<peerCount; ++j )
1208                if( tr_bitfieldHas( peers[j]->have, i ) )
1209                    ++tab[i];
1210        }
1211    }
1212
1213    managerUnlock( (tr_peerMgr*)manager );
1214}
1215
1216/* Returns the pieces that we and/or a connected peer has */
1217tr_bitfield*
1218tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1219                        const uint8_t    * torrentHash )
1220{
1221    int i, size;
1222    const Torrent * t;
1223    const tr_peer ** peers;
1224    tr_bitfield * pieces;
1225
1226    managerLock( (tr_peerMgr*)manager );
1227
1228    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1229    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1230    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1231    for( i=0; i<size; ++i )
1232        if( peers[i]->io != NULL )
1233            tr_bitfieldAnd( pieces, peers[i]->have );
1234
1235    managerUnlock( (tr_peerMgr*)manager );
1236    return pieces;
1237}
1238
1239void
1240tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1241                        const uint8_t    * torrentHash,
1242                        int              * setmePeersKnown,
1243                        int              * setmePeersConnected,
1244                        int              * setmePeersSendingToUs,
1245                        int              * setmePeersGettingFromUs,
1246                        int              * setmePeersFrom )
1247{
1248    int i, size;
1249    const Torrent * t;
1250    const tr_peer ** peers;
1251
1252    managerLock( (tr_peerMgr*)manager );
1253
1254    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1255    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1256
1257    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1258    *setmePeersConnected      = 0;
1259    *setmePeersSendingToUs    = 0;
1260    *setmePeersGettingFromUs  = 0;
1261
1262    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1263        setmePeersFrom[i] = 0;
1264
1265    for( i=0; i<size; ++i )
1266    {
1267        const tr_peer * peer = peers[i];
1268        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1269
1270        if( peer->io == NULL ) /* not connected */
1271            continue;
1272
1273        ++*setmePeersConnected;
1274
1275        ++setmePeersFrom[atom->from];
1276
1277        if( tr_rcRate( peer->rateToPeer ) > 0.01 )
1278            ++*setmePeersGettingFromUs;
1279
1280        if( tr_rcRate( peer->rateToClient ) > 0.01 )
1281            ++*setmePeersSendingToUs;
1282    }
1283
1284    managerUnlock( (tr_peerMgr*)manager );
1285}
1286
1287struct tr_peer_stat *
1288tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1289                     const uint8_t     * torrentHash,
1290                     int               * setmeCount UNUSED )
1291{
1292    int i, size;
1293    const Torrent * t;
1294    const tr_peer ** peers;
1295    tr_peer_stat * ret;
1296
1297    assert( manager != NULL );
1298    managerLock( (tr_peerMgr*)manager );
1299
1300    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1301    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1302
1303    ret = tr_new0( tr_peer_stat, size );
1304
1305    for( i=0; i<size; ++i )
1306    {
1307        const tr_peer * peer = peers[i];
1308        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1309        tr_peer_stat * stat = ret + i;
1310
1311        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1312        stat->port             = peer->port;
1313        stat->from             = atom->from;
1314        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1315        stat->progress         = peer->progress;
1316        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1317        stat->uploadToRate     = tr_rcRate( peer->rateToPeer );
1318        stat->downloadFromRate = tr_rcRate( peer->rateToClient );
1319        stat->isDownloading    = stat->uploadToRate > 0.01;
1320        stat->isUploading      = stat->downloadFromRate > 0.01;
1321    }
1322
1323    *setmeCount = size;
1324
1325    managerUnlock( (tr_peerMgr*)manager );
1326    return ret;
1327}
1328
1329/**
1330***
1331**/
1332
1333typedef struct
1334{
1335    tr_peer * peer;
1336    float rate;
1337    int randomKey;
1338    int preferred;
1339    int doUnchoke;
1340}
1341ChokeData;
1342
1343static int
1344compareChoke( const void * va, const void * vb )
1345{
1346    const ChokeData * a = va;
1347    const ChokeData * b = vb;
1348
1349    if( a->preferred != b->preferred )
1350        return a->preferred ? -1 : 1;
1351
1352    if( a->preferred )
1353    {
1354        if( a->rate > b->rate ) return -1;
1355        if( a->rate < b->rate ) return 1;
1356        return 0;
1357    }
1358    else
1359    {
1360        return a->randomKey - b->randomKey;
1361    }
1362}
1363
1364static int
1365clientIsSnubbedBy( const tr_peer * peer )
1366{
1367    assert( peer != NULL );
1368
1369    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1370}
1371
1372/**
1373***
1374**/
1375
1376static void
1377rechokeLeech( Torrent * t )
1378{
1379    int i, peerCount, size=0, unchoked=0;
1380    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1381    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1382    ChokeData * choke = tr_new0( ChokeData, peerCount );
1383
1384    assert( torrentIsLocked( t ) );
1385   
1386    /* sort the peers by preference and rate */
1387    for( i=0; i<peerCount; ++i )
1388    {
1389        tr_peer * peer = peers[i];
1390        ChokeData * node;
1391        if( peer->chokeChangedAt > ignorePeersNewerThan )
1392            continue;
1393
1394        node = &choke[size++];
1395        node->peer = peer;
1396        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1397        node->randomKey = tr_rand( INT_MAX );
1398        node->rate = (3*tr_rcRate(peer->rateToPeer))
1399                   + (1*tr_rcRate(peer->rateToClient));
1400    }
1401
1402    qsort( choke, size, sizeof(ChokeData), compareChoke );
1403
1404    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1405        choke[i].doUnchoke = 1;
1406        ++unchoked;
1407    }
1408
1409    for( ; i<size; ++i ) {
1410        choke[i].doUnchoke = 1;
1411        ++unchoked;
1412        if( choke[i].peer->peerIsInterested )
1413            break;
1414    }
1415
1416    for( i=0; i<size; ++i )
1417        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1418
1419    /* cleanup */
1420    tr_free( choke );
1421    tr_free( peers );
1422}
1423
1424static void
1425rechokeSeed( Torrent * t )
1426{
1427    int i, size;
1428    tr_peer ** peers;
1429
1430    assert( torrentIsLocked( t ) );
1431
1432    peers = getConnectedPeers( t, &size );
1433
1434    /* FIXME */
1435    for( i=0; i<size; ++i )
1436        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1437
1438    tr_free( peers );
1439}
1440
1441static int
1442rechokePulse( void * vtorrent )
1443{
1444    Torrent * t = vtorrent;
1445    torrentLock( t );
1446
1447    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1448    if( done )
1449        rechokeLeech( vtorrent );
1450    else
1451        rechokeSeed( vtorrent );
1452
1453    torrentUnlock( t );
1454    return TRUE;
1455}
1456
1457/***
1458****
1459****
1460****
1461***/
1462
1463struct tr_connection
1464{
1465    tr_peer * peer;
1466    double throughput;
1467};
1468
1469#define LAISSEZ_FAIRE_PERIOD_SECS 90
1470
1471static int
1472compareConnections( const void * va, const void * vb )
1473{
1474    const struct tr_connection * a = va;
1475    const struct tr_connection * b = vb;
1476    if( a->throughput < b->throughput ) return -1;
1477    if( a->throughput > b->throughput ) return 1;
1478    return 0;
1479}
1480
1481static struct tr_connection *
1482getWeakConnections( Torrent * t, int * setmeSize )
1483{
1484    int i, insize, outsize;
1485    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1486    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1487    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1488    const time_t now = time( NULL );
1489
1490    assert( torrentIsLocked( t ) );
1491
1492    for( i=outsize=0; i<insize; ++i )
1493    {
1494        tr_peer * peer = peers[i];
1495        int isWeak;
1496        const int peerIsSeed = peer->progress >= 1.0;
1497        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1498        const double throughput = (3*tr_rcRate(peer->rateToPeer))
1499                                + (1*tr_rcRate(peer->rateToClient));
1500
1501        assert( atom != NULL );
1502
1503        if( throughput >= 3 )
1504            isWeak = FALSE;
1505        else if( peer->doPurge )
1506            isWeak = TRUE;
1507        else if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) ) /* pex time */
1508            isWeak = TRUE;
1509        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1510            isWeak = FALSE;
1511#if 0
1512        else
1513            isWeak = TRUE;
1514#else
1515        else if( now - peer->pieceDataActivityDate > 180 )
1516            isWeak = TRUE;
1517        else
1518            isWeak = FALSE;
1519#endif
1520
1521        if( isWeak )
1522        {
1523            ret[outsize].peer = peer;
1524            ret[outsize].throughput = throughput;
1525            ++outsize;
1526        }
1527    }
1528
1529    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1530    *setmeSize = outsize;
1531    return ret;
1532}
1533
1534static int
1535compareAtomByTime( const void * va, const void * vb )
1536{
1537    const struct peer_atom * a = * (const struct peer_atom**) va;
1538    const struct peer_atom * b = * (const struct peer_atom**) vb;
1539    if( a->time < b->time ) return -1;
1540    if( a->time > b->time ) return 1;
1541    return 0;
1542}
1543
1544static struct peer_atom **
1545getPeerCandidates( Torrent * t, int * setmeSize )
1546{
1547    int i, insize, outsize;
1548    struct peer_atom ** atoms;
1549    struct peer_atom ** ret;
1550    const time_t now = time( NULL );
1551    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1552
1553    assert( torrentIsLocked( t ) );
1554
1555    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1556    ret = tr_new( struct peer_atom*, insize );
1557    for( i=outsize=0; i<insize; ++i )
1558    {
1559        struct peer_atom * atom = atoms[i];
1560
1561        /* we don't need two connections to the same peer... */
1562        if( peerIsInUse( t, &atom->addr ) ) {
1563            tordbg( t, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1564            continue;
1565        }
1566
1567        /* no need to connect if we're both seeds... */
1568        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1569            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1570            continue;
1571        }
1572
1573        /* if we used this peer recently, give someone else a turn */
1574        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1575            tordbg( t, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1576            continue;
1577        }
1578
1579        ret[outsize++] = atom;
1580    }
1581
1582    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1583    *setmeSize = outsize;
1584    return ret;
1585}
1586
1587static int
1588reconnectPulse( void * vtorrent )
1589{
1590    Torrent * t = vtorrent;
1591
1592    torrentLock( t );
1593
1594    if( !t->isRunning )
1595    {
1596        removeAllPeers( t );
1597    }
1598    else
1599    {
1600        int i, nCandidates, nConnections, nAdd;
1601        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1602        struct tr_connection * connections = getWeakConnections( t, &nConnections );
1603        const int peerCount = tr_ptrArraySize( t->peers );
1604
1605        tordbg( t, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d\n",
1606                 t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), (int)MAX_RECONNECTIONS_PER_PULSE );
1607
1608        for( i=0; i<nConnections; ++i )
1609            tordbg( t, "connection #%d: %s @ %.2f\n", i+1,
1610                     tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1611
1612        /* disconnect some peers */
1613        for( i=0; i<nConnections && i<(peerCount-5) && i<MAX_RECONNECTIONS_PER_PULSE; ++i ) {
1614            const double throughput = connections[i].throughput;
1615            tr_peer * peer = connections[i].peer;
1616            tordbg( t, "RECONNECT culling peer %s, whose throughput was %f\n",
1617                       tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1618            removePeer( t, peer );
1619        }
1620
1621        /* add some new ones */
1622        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1623        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1624        {
1625            tr_peerMgr * mgr = t->manager;
1626
1627            struct peer_atom * atom = candidates[i];
1628
1629            tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1630
1631            tr_handshake * handshake = tr_handshakeNew( io,
1632                                                        mgr->handle->encryptionMode,
1633                                                        myHandshakeDoneCB,
1634                                                        mgr );
1635            ++mgr->connectionCount;
1636
1637            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1638
1639            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1640
1641            atom->time = time( NULL );
1642        }
1643
1644        /* cleanup */
1645        tr_free( connections );
1646        tr_free( candidates );
1647    }
1648
1649    torrentUnlock( t );
1650    return TRUE;
1651}
Note: See TracBrowser for help on using the repository browser.