source: trunk/libtransmission/peer-mgr.c @ 3274

Last change on this file since 3274 was 3274, checked in by charles, 15 years ago

tweak the peer manager code a little more.

  • Property svn:keywords set to Date Rev Author Id
File size: 43.5 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3274 2007-10-02 19:54:14Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <sys/types.h> /* event.h needs this */
20#include <event.h>
21
22#include "transmission.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "platform.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "shared.h"
36#include "trevent.h"
37#include "utils.h"
38
39enum
40{
41    /* how frequently to change which peers are choked */
42    RECHOKE_PERIOD_MSEC = (15 * 1000),
43
44    /* how frequently to decide which peers live and die */
45    RECONNECT_PERIOD_MSEC = (5 * 1000),
46
47    /* how frequently to refill peers' request lists */
48    REFILL_PERIOD_MSEC = 1000,
49
50    /* how many peers to unchoke per-torrent. */
51    /* FIXME: make this user-configurable? */
52    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
53
54    /* don't change a peer's choke status more often than this */
55    MIN_CHOKE_PERIOD_SEC = 10,
56
57    /* following the BT spec, we consider ourselves `snubbed' if
58     * we're we don't get piece data from a peer in this long */
59    SNUBBED_SEC = 60,
60
61    /* this is arbitrary and, hopefully, temporary until we come up
62     * with a better idea for managing the connection limits */
63    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
64
65    /* another arbitrary number */
66    MAX_RECONNECTIONS_PER_MINUTE = 60,
67
68    MAX_RECONNECTIONS_PER_PULSE =
69        ((MAX_RECONNECTIONS_PER_MINUTE * RECONNECT_PERIOD_MSEC) / (60*1000)),
70
71    /* corresponds to ut_pex's added.f flags */
72    ADDED_F_ENCRYPTION_FLAG = 1,
73
74    /* corresponds to ut_pex's added.f flags */
75    ADDED_F_SEED_FLAG = 2
76};
77
78
79/**
80***
81**/
82
83/* We keep one of these for every peer we know about, whether
84 * it's connected or not, so the struct must be small.
85 * When our current connections underperform, we dip back
86 * into this list for new ones. */
87struct peer_atom
88{   
89    uint8_t from;
90    uint8_t flags; /* these match the added_f flags */
91    uint16_t port;
92    struct in_addr addr; 
93    time_t time;
94};
95
96typedef struct
97{
98    uint8_t hash[SHA_DIGEST_LENGTH];
99    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
100    tr_ptrArray * pool; /* struct peer_atom */
101    tr_ptrArray * peers; /* tr_peer */
102    tr_timer * reconnectTimer;
103    tr_timer * rechokeTimer;
104    tr_timer * refillTimer;
105    tr_torrent * tor;
106    tr_bitfield * requested;
107
108    unsigned int isRunning : 1;
109
110    struct tr_peerMgr * manager;
111}
112Torrent;
113
114struct tr_peerMgr
115{
116    int connectionCount;
117    tr_handle * handle;
118    tr_ptrArray * torrents; /* Torrent */
119    tr_ptrArray * incomingHandshakes; /* tr_handshake */
120};
121
122/**
123***
124**/
125
126static void
127myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
128{
129    FILE * fp = tr_getLog( );
130    if( fp != NULL )
131    {
132        va_list args;
133        struct evbuffer * buf = evbuffer_new( );
134        evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name  );
135        va_start( args, fmt );
136        evbuffer_add_vprintf( buf, fmt, args );
137        va_end( args );
138        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
139        evbuffer_free( buf );
140    }
141}
142
143#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
144
145/**
146***
147**/
148
149static void
150managerLock( struct tr_peerMgr * manager )
151{
152    tr_globalLock( manager->handle );
153}
154static void
155managerUnlock( struct tr_peerMgr * manager )
156{
157    tr_globalUnlock( manager->handle );
158}
159static void
160torrentLock( Torrent * torrent )
161{
162    managerLock( torrent->manager );
163}
164static void
165torrentUnlock( Torrent * torrent )
166{
167    managerUnlock( torrent->manager );
168}
169static int
170torrentIsLocked( const Torrent * t )
171{
172    return tr_globalIsLocked( t->manager->handle );
173}
174
175/**
176***
177**/
178
179static int
180compareAddresses( const struct in_addr * a, const struct in_addr * b )
181{
182    return tr_compareUint32( a->s_addr, b->s_addr );
183}
184
185static int
186handshakeCompareToAddr( const void * va, const void * vb )
187{
188    const tr_handshake * a = va;
189    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
190}
191
192static int
193handshakeCompare( const void * a, const void * b )
194{
195    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
196}
197
198static tr_handshake*
199getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
200{
201    return tr_ptrArrayFindSorted( handshakes,
202                                  in_addr,
203                                  handshakeCompareToAddr );
204}
205
206static int
207comparePeerAtomToAddress( const void * va, const void * vb )
208{
209    const struct peer_atom * a = va;
210    return compareAddresses( &a->addr, vb );
211}
212
213static int
214comparePeerAtoms( const void * va, const void * vb )
215{
216    const struct peer_atom * b = vb;
217    return comparePeerAtomToAddress( va, &b->addr );
218}
219
220/**
221***
222**/
223
224static int
225torrentCompare( const void * va, const void * vb )
226{
227    const Torrent * a = va;
228    const Torrent * b = vb;
229    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
230}
231
232static int
233torrentCompareToHash( const void * va, const void * vb )
234{
235    const Torrent * a = (const Torrent*) va;
236    const uint8_t * b_hash = (const uint8_t*) vb;
237    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
238}
239
240static Torrent*
241getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
242{
243    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
244                                             hash,
245                                             torrentCompareToHash );
246}
247
248static int
249peerCompare( const void * va, const void * vb )
250{
251    const tr_peer * a = (const tr_peer *) va;
252    const tr_peer * b = (const tr_peer *) vb;
253    return compareAddresses( &a->in_addr, &b->in_addr );
254}
255
256static int
257peerCompareToAddr( const void * va, const void * vb )
258{
259    const tr_peer * a = (const tr_peer *) va;
260    return compareAddresses( &a->in_addr, vb );
261}
262
263static tr_peer*
264getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
265{
266    assert( torrentIsLocked( torrent ) );
267    assert( in_addr != NULL );
268
269    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
270                                             in_addr,
271                                             peerCompareToAddr );
272}
273
274static struct peer_atom*
275getExistingAtom( const Torrent * t, const struct in_addr * addr )
276{
277    assert( torrentIsLocked( t ) );
278    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
279}
280
281static int
282peerIsKnown( const Torrent * t, const struct in_addr * addr )
283{
284    return getExistingAtom( t, addr ) != NULL;
285}
286
287static int
288peerIsInUse( const Torrent * ct, const struct in_addr * addr )
289{
290    Torrent * t = (Torrent*) ct;
291
292    assert( torrentIsLocked ( t ) );
293
294    return ( getExistingPeer( t, addr ) != NULL )
295        || ( getExistingHandshake( t->outgoingHandshakes, addr ) != NULL )
296        || ( getExistingHandshake( t->manager->incomingHandshakes, addr ) != NULL );
297}
298
299static tr_peer*
300getPeer( Torrent * torrent, const struct in_addr * in_addr )
301{
302    tr_peer * peer;
303
304    assert( torrentIsLocked( torrent ) );
305
306    peer = getExistingPeer( torrent, in_addr );
307
308    if( peer == NULL )
309    {
310        peer = tr_new0( tr_peer, 1 );
311        peer->rateToClient = tr_rcInit( );
312        peer->rateToPeer = tr_rcInit( );
313        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
314        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
315    }
316
317    return peer;
318}
319
320static void
321disconnectPeer( tr_peer * peer )
322{
323    assert( peer != NULL );
324
325    tr_peerIoFree( peer->io );
326    peer->io = NULL;
327
328    if( peer->msgs != NULL )
329    {
330        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
331        tr_peerMsgsFree( peer->msgs );
332        peer->msgs = NULL;
333    }
334
335    tr_bitfieldFree( peer->have );
336    peer->have = NULL;
337
338    tr_bitfieldFree( peer->blame );
339    peer->blame = NULL;
340
341    tr_bitfieldFree( peer->banned );
342    peer->banned = NULL;
343}
344
345static void
346freePeer( tr_peer * peer )
347{
348    disconnectPeer( peer );
349    tr_rcClose( peer->rateToClient );
350    tr_rcClose( peer->rateToPeer );
351    tr_free( peer->client );
352    tr_free( peer );
353}
354
355static void
356removePeer( Torrent * t, tr_peer * peer )
357{
358    tr_peer * removed;
359    struct peer_atom * atom;
360
361    assert( torrentIsLocked( t ) );
362
363    atom = getExistingAtom( t, &peer->in_addr );
364    assert( atom != NULL );
365    atom->time = time( NULL );
366
367    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
368    assert( removed == peer );
369    freePeer( removed );
370}
371
372static void
373removeAllPeers( Torrent * t )
374{
375    while( !tr_ptrArrayEmpty( t->peers ) )
376        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
377}
378
379/* torrent must have already been removed from manager->torrents */
380static void
381torrentDestructor( Torrent * t )
382{
383    uint8_t hash[SHA_DIGEST_LENGTH];
384
385    assert( t != NULL );
386    assert( !t->isRunning );
387    assert( t->peers != NULL );
388    assert( torrentIsLocked( t ) );
389    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
390    assert( tr_ptrArrayEmpty( t->peers ) );
391
392    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
393
394    tr_timerFree( &t->reconnectTimer );
395    tr_timerFree( &t->rechokeTimer );
396    tr_timerFree( &t->refillTimer );
397
398    tr_bitfieldFree( t->requested );
399    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
400    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
401
402    tr_free( t );
403}
404
405static Torrent*
406torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
407{
408    Torrent * t;
409
410    t = tr_new0( Torrent, 1 );
411    t->manager = manager;
412    t->tor = tor;
413    t->pool = tr_ptrArrayNew( );
414    t->peers = tr_ptrArrayNew( );
415    t->outgoingHandshakes = tr_ptrArrayNew( );
416    t->requested = tr_bitfieldNew( tor->blockCount );
417    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
418
419    return t;
420}
421
422/**
423***
424**/
425
426struct tr_bitfield *
427tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
428                              const uint32_t         pieceCount,
429                              const uint8_t          infohash[20],
430                              const struct in_addr * ip )
431{
432    /* This has been checked against the spec example implementation. Feeding it with :
433    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
434generate :
435    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
436    but since we're storing in a bitfield, it won't be in this order... */
437    /* TODO : We should translate link-local IPv4 adresses to external IP,
438     * so that being on same local network gives us the same allowed pieces */
439   
440    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
441    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
442            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
443            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
444            inet_ntoa( *ip ) );
445   
446    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
447    char buf[4];
448    uint32_t allowedPieceCount = 0;
449    tr_bitfield_t * ret;
450   
451    ret = tr_bitfieldNew( pieceCount );
452   
453    /* We need a seed based on most significant bytes of peer address
454        concatenated with torrent's infohash */
455    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
456   
457    memcpy( seed, &buf, 4 );
458    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
459   
460    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
461   
462    while ( allowedPieceCount < setCount )
463    {
464        int i;
465        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
466        {
467            /* We generate indices from 4-byte chunks of the seed */
468            uint32_t j = i * 4;
469            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
470            uint32_t index = y % pieceCount;
471           
472            if ( !tr_bitfieldHas( ret, index ) )
473            {
474                tr_bitfieldAdd( ret, index );
475                allowedPieceCount++;
476            }
477        }
478        /* We randomize the seed, in case we need to iterate more */
479        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
480    }
481    tr_free( seed );
482   
483    return ret;
484}
485
486tr_peerMgr*
487tr_peerMgrNew( tr_handle * handle )
488{
489    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
490    m->handle = handle;
491    m->torrents = tr_ptrArrayNew( );
492    m->incomingHandshakes = tr_ptrArrayNew( );
493    return m;
494}
495
496void
497tr_peerMgrFree( tr_peerMgr * manager )
498{
499    managerLock( manager );
500
501    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
502     * the item from manager->handshakes, so this is a little roundabout... */
503    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
504        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
505    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
506
507    /* free the torrents. */
508    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
509
510    managerUnlock( manager );
511    tr_free( manager );
512}
513
514static tr_peer**
515getConnectedPeers( Torrent * t, int * setmeCount )
516{
517    int i, peerCount, connectionCount;
518    tr_peer **peers;
519    tr_peer **ret;
520
521    assert( torrentIsLocked( t ) );
522
523    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
524    ret = tr_new( tr_peer*, peerCount );
525
526    for( i=connectionCount=0; i<peerCount; ++i )
527        if( peers[i]->msgs != NULL )
528            ret[connectionCount++] = peers[i];
529
530    *setmeCount = connectionCount;
531    return ret;
532}
533
534/***
535****  Refill
536***/
537
538struct tr_refill_piece
539{
540    tr_priority_t priority;
541    uint16_t random;
542    uint32_t piece;
543    uint32_t peerCount;
544    uint32_t fastAllowed;
545};
546
547static int
548compareRefillPiece (const void * aIn, const void * bIn)
549{
550    const struct tr_refill_piece * a = aIn;
551    const struct tr_refill_piece * b = bIn;
552
553    /* if one piece has a higher priority, it goes first */
554    if (a->priority != b->priority)
555        return a->priority > b->priority ? -1 : 1;
556
557    /* otherwise if one has fewer peers, it goes first */
558    if (a->peerCount != b->peerCount)
559        return a->peerCount < b->peerCount ? -1 : 1;
560   
561    /* otherwise if one *might be* fastallowed to us */
562    if (a->fastAllowed != b->fastAllowed)
563        return a->fastAllowed < b->fastAllowed ? -1 : 1;
564
565    /* otherwise go with our random seed */
566    return tr_compareUint16( a->random, b->random );
567}
568
569static int
570isPieceInteresting( const tr_torrent  * tor,
571                    int                 piece )
572{
573    if( tor->info.pieces[piece].dnd ) /* we don't want it */
574        return 0;
575
576    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
577        return 0;
578
579    return 1;
580}
581
582static uint32_t*
583getPreferredPieces( Torrent     * t,
584                    uint32_t    * pieceCount )
585{
586    const tr_torrent * tor = t->tor;
587    const tr_info * inf = &tor->info;
588    int i;
589    uint32_t poolSize = 0;
590    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
591    int peerCount;
592    tr_peer** peers;
593
594    assert( torrentIsLocked( t ) );
595
596    peers = getConnectedPeers( t, &peerCount );
597
598    for( i=0; i<inf->pieceCount; ++i )
599        if( isPieceInteresting( tor, i ) )
600            pool[poolSize++] = i;
601
602    /* sort the pool from most interesting to least... */
603    if( poolSize > 1 )
604    {
605        uint32_t j;
606        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
607
608        for( j=0; j<poolSize; ++j )
609        {
610            int k;
611            const int piece = pool[j];
612            struct tr_refill_piece * setme = p + j;
613
614            setme->piece = piece;
615            setme->priority = inf->pieces[piece].priority;
616            setme->peerCount = 0;
617            setme->fastAllowed = 0;
618            setme->random = tr_rand( UINT16_MAX );
619            /* FIXME */
620//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
621
622            for( k=0; k<peerCount; ++k ) {
623                const tr_peer * peer = peers[k];
624                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
625                    ++setme->peerCount;
626            }
627        }
628
629        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
630
631        for( j=0; j<poolSize; ++j )
632            pool[j] = p[j].piece;
633
634        tr_free( p );
635    }
636
637    tr_free( peers );
638
639    *pieceCount = poolSize;
640    return pool;
641}
642
643static uint64_t*
644getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
645{
646    uint32_t i;
647    uint32_t pieceCount;
648    uint32_t * pieces;
649    uint64_t *req, *unreq, *ret, *walk;
650    int reqCount, unreqCount;
651    const tr_torrent * tor = t->tor;
652
653    assert( torrentIsLocked( t ) );
654
655    pieces = getPreferredPieces( t, &pieceCount );
656
657    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
658    reqCount = 0;
659    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
660    unreqCount = 0;
661
662    for( i=0; i<pieceCount; ++i ) {
663        const uint32_t index = pieces[i];
664        const int begin = tr_torPieceFirstBlock( tor, index );
665        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
666        int block;
667        for( block=begin; block<end; ++block )
668            if( tr_cpBlockIsComplete( tor->completion, block ) )
669                continue;
670            else if( tr_bitfieldHas( t->requested, block ) )
671                req[reqCount++] = block;
672            else
673                unreq[unreqCount++] = block;
674    }
675
676    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
677    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
678    walk += unreqCount;
679    memcpy( walk, req, sizeof(uint64_t) * reqCount );
680    walk += reqCount;
681    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
682    *setmeCount = walk - ret;
683
684    tr_free( req );
685    tr_free( unreq );
686    tr_free( pieces );
687
688    return ret;
689}
690
691static int
692refillPulse( void * vtorrent )
693{
694    Torrent * t = vtorrent;
695    tr_torrent * tor = t->tor;
696    uint32_t i;
697    int peerCount;
698    tr_peer ** peers;
699    uint64_t blockCount;
700    uint64_t * blocks;
701
702    if( !t->isRunning )
703        return TRUE;
704    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
705        return TRUE;
706
707    torrentLock( t );
708
709    blocks = getPreferredBlocks( t, &blockCount );
710    peers = getConnectedPeers( t, &peerCount );
711
712    for( i=0; peerCount && i<blockCount; ++i )
713    {
714        const int block = blocks[i];
715        const uint32_t index = tr_torBlockPiece( tor, block );
716        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
717        const uint32_t length = tr_torBlockCountBytes( tor, block );
718        int j;
719        assert( _tr_block( tor, index, begin ) == block );
720        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
721        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
722
723
724        /* find a peer who can ask for this block */
725        for( j=0; j<peerCount; )
726        {
727            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
728            switch( val )
729            {
730                case TR_ADDREQ_FULL: 
731                case TR_ADDREQ_CLIENT_CHOKED:
732                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
733                    break;
734
735                case TR_ADDREQ_MISSING: 
736                    ++j;
737                    break;
738
739                case TR_ADDREQ_OK:
740                    tr_bitfieldAdd( t->requested, block );
741                    j = peerCount;
742                    break;
743
744                default:
745                    assert( 0 && "unhandled value" );
746                    break;
747            }
748        }
749    }
750
751    /* cleanup */
752    tr_free( peers );
753    tr_free( blocks );
754
755    t->refillTimer = NULL;
756
757    torrentUnlock( t );
758    return FALSE;
759}
760
761static void
762broadcastClientHave( Torrent * t, uint32_t index )
763{
764    int i, size;
765    tr_peer ** peers;
766
767    assert( torrentIsLocked( t ) );
768
769    peers = getConnectedPeers( t, &size );
770    for( i=0; i<size; ++i )
771        tr_peerMsgsHave( peers[i]->msgs, index );
772    tr_free( peers );
773}
774
775static void
776broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
777{
778    int i, size;
779    tr_peer ** peers;
780
781    assert( torrentIsLocked( t ) );
782
783    peers = getConnectedPeers( t, &size );
784    for( i=0; i<size; ++i )
785        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
786    tr_free( peers );
787}
788
789static void
790msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
791{
792    tr_peer * peer = vpeer;
793    Torrent * t = (Torrent *) vt;
794    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
795
796    torrentLock( t );
797
798    switch( e->eventType )
799    {
800        case TR_PEERMSG_NEED_REQ:
801            if( t->refillTimer == NULL )
802                t->refillTimer = tr_timerNew( t->manager->handle,
803                                              refillPulse, t,
804                                              REFILL_PERIOD_MSEC );
805            break;
806
807        case TR_PEERMSG_CLIENT_HAVE:
808            broadcastClientHave( t, e->pieceIndex );
809            tr_torrentRecheckCompleteness( t->tor );
810            break;
811
812        case TR_PEERMSG_PEER_PROGRESS: {
813            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
814            const int peerIsSeed = e->progress >= 1.0;
815            if( peerIsSeed )
816                atom->flags |= ADDED_F_SEED_FLAG;
817            else
818                atom->flags &= ~ADDED_F_SEED_FLAG;
819            break;
820        }
821
822        case TR_PEERMSG_CLIENT_BLOCK:
823            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
824            break;
825
826        case TR_PEERMSG_GOT_ERROR:
827            peer->doPurge = 1;
828            break;
829
830        default:
831            assert(0);
832    }
833
834    torrentUnlock( t );
835}
836
837static void
838ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
839{
840    if( !peerIsKnown( t, addr ) )
841    {
842        struct peer_atom * a = tr_new( struct peer_atom, 1 );
843        a->addr = *addr;
844        a->port = port;
845        a->flags = flags;
846        a->from = from;
847        a->time = 0;
848        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
849        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
850    }
851}
852
853/* FIXME: this is kind of a mess. */
854static void
855myHandshakeDoneCB( tr_handshake    * handshake,
856                   tr_peerIo       * io,
857                   int               isConnected,
858                   const uint8_t   * peer_id,
859                   void            * vmanager )
860{
861    int ok = isConnected;
862    uint16_t port;
863    const struct in_addr * in_addr;
864    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
865    Torrent * t;
866    tr_handshake * ours;
867
868    assert( io != NULL );
869    assert( isConnected==0 || isConnected==1 );
870
871    t = tr_peerIoHasTorrentHash( io )
872        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
873        : NULL;
874
875    if( tr_peerIoIsIncoming ( io ) )
876        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
877                                        handshake, handshakeCompare );
878    else if( t != NULL )
879        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
880                                        handshake, handshakeCompare );
881    else
882        ours = handshake;
883
884    assert( ours != NULL );
885    assert( ours == handshake );
886
887    if( t != NULL )
888        torrentLock( t );
889
890    in_addr = tr_peerIoGetAddress( io, &port );
891
892    if( !t || !t->isRunning )
893    {
894        tr_peerIoFree( io );
895        --manager->connectionCount;
896    }
897    else if( !ok )
898    {
899        /* if we couldn't connect or were snubbed,
900         * the peer's probably not worth remembering. */
901        tr_peer * peer = getExistingPeer( t, in_addr );
902        tr_peerIoFree( io );
903        --manager->connectionCount;
904        if( peer )
905            peer->doPurge = 1;
906    }
907    else /* looking good */
908    {
909        tr_peer * peer = getPeer( t, in_addr );
910        uint16_t port;
911        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
912        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
913        if( peer->msgs != NULL ) { /* we already have this peer */
914            tr_peerIoFree( io );
915            --manager->connectionCount;
916        } else {
917            peer->port = port;
918            peer->io = io;
919            peer->msgs = tr_peerMsgsNew( t->tor, peer );
920            tr_free( peer->client );
921            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
922            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
923        }
924    }
925
926    if( t != NULL )
927        torrentUnlock( t );
928}
929
930void
931tr_peerMgrAddIncoming( tr_peerMgr      * manager,
932                       struct in_addr  * addr,
933                       uint16_t          port,
934                       int               socket )
935{
936    managerLock( manager );
937
938    if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL )
939    {
940        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
941
942        tr_handshake * handshake = tr_handshakeNew( io,
943                                                    manager->handle->encryptionMode,
944                                                    myHandshakeDoneCB,
945                                                    manager );
946        ++manager->connectionCount;
947
948        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
949    }
950
951    managerUnlock( manager );
952}
953
954void
955tr_peerMgrAddPex( tr_peerMgr     * manager,
956                  const uint8_t  * torrentHash,
957                  int              from,
958                  const tr_pex   * pex,
959                  int              pexCount )
960{
961    Torrent * t;
962    const tr_pex * end;
963
964    managerLock( manager );
965
966    t = getExistingTorrent( manager, torrentHash );
967    for( end=pex+pexCount; pex!=end; ++pex )
968        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
969
970    managerUnlock( manager );
971}
972
973void
974tr_peerMgrAddPeers( tr_peerMgr    * manager,
975                    const uint8_t * torrentHash,
976                    int             from,
977                    const uint8_t * peerCompact,
978                    int             peerCount )
979{
980    int i;
981    const uint8_t * walk = peerCompact;
982    Torrent * t;
983
984    managerLock( manager );
985
986    t = getExistingTorrent( manager, torrentHash );
987    for( i=0; t!=NULL && i<peerCount; ++i )
988    {
989        struct in_addr addr;
990        uint16_t port;
991        memcpy( &addr, walk, 4 ); walk += 4;
992        memcpy( &port, walk, 2 ); walk += 2;
993        ensureAtomExists( t, &addr, port, 0, from );
994    }
995
996    managerUnlock( manager );
997}
998
999/**
1000***
1001**/
1002
1003void
1004tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1005                    const uint8_t  * torrentHash UNUSED,
1006                    int              pieceIndex UNUSED,
1007                    int              success UNUSED )
1008{
1009    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1010}
1011
1012int
1013tr_pexCompare( const void * va, const void * vb )
1014{
1015    const tr_pex * a = (const tr_pex *) va;
1016    const tr_pex * b = (const tr_pex *) vb;
1017    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1018    if( i ) return i;
1019    if( a->port < b->port ) return -1;
1020    if( a->port > b->port ) return 1;
1021    return 0;
1022}
1023
1024int tr_pexCompare( const void * a, const void * b );
1025
1026static int
1027peerPrefersCrypto( const tr_peer * peer )
1028{
1029    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1030        return TRUE;
1031
1032    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1033        return FALSE;
1034
1035    return tr_peerIoIsEncrypted( peer->io );
1036};
1037
1038int
1039tr_peerMgrGetPeers( tr_peerMgr      * manager,
1040                    const uint8_t   * torrentHash,
1041                    tr_pex         ** setme_pex )
1042{
1043    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1044    int i, peerCount;
1045    const tr_peer ** peers;
1046    tr_pex * pex;
1047    tr_pex * walk;
1048
1049    torrentLock( (Torrent*)t );
1050
1051    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1052    pex = walk = tr_new( tr_pex, peerCount );
1053
1054    for( i=0; i<peerCount; ++i, ++walk )
1055    {
1056        const tr_peer * peer = peers[i];
1057
1058        walk->in_addr = peer->in_addr;
1059
1060        walk->port = peer->port;
1061
1062        walk->flags = 0;
1063        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1064        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1065    }
1066
1067    assert( ( walk - pex ) == peerCount );
1068    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1069    *setme_pex = pex;
1070
1071    torrentUnlock( (Torrent*)t );
1072
1073    return peerCount;
1074}
1075
1076static int reconnectPulse( void * vtorrent );
1077static int rechokePulse( void * vtorrent );
1078
1079void
1080tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1081                        const uint8_t  * torrentHash )
1082{
1083    Torrent * t;
1084
1085    managerLock( manager );
1086
1087    t = getExistingTorrent( manager, torrentHash );
1088
1089    assert( t != NULL );
1090    assert( !t->isRunning );
1091    assert( t->reconnectTimer == NULL );
1092    assert( t->rechokeTimer == NULL );
1093
1094    t->isRunning = 1;
1095    t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
1096    t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
1097
1098    managerUnlock( manager );
1099}
1100
1101static void
1102stopTorrent( Torrent * t )
1103{
1104    assert( torrentIsLocked( t ) );
1105
1106    t->isRunning = 0;
1107    tr_timerFree( &t->rechokeTimer );
1108    tr_timerFree( &t->reconnectTimer );
1109
1110    /* disconnect the peers. */
1111    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)freePeer );
1112    tr_ptrArrayClear( t->peers );
1113
1114    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1115     * which removes the handshake from t->outgoingHandshakes... */
1116    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1117        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1118}
1119void
1120tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1121                       const uint8_t  * torrentHash)
1122{
1123    managerLock( manager );
1124
1125    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1126
1127    managerUnlock( manager );
1128}
1129
1130void
1131tr_peerMgrAddTorrent( tr_peerMgr * manager,
1132                      tr_torrent * tor )
1133{
1134    Torrent * t;
1135
1136    managerLock( manager );
1137
1138    assert( tor != NULL );
1139    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1140
1141    t = torrentConstructor( manager, tor );
1142    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1143
1144    managerUnlock( manager );
1145}
1146
1147void
1148tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1149                         const uint8_t  * torrentHash )
1150{
1151    Torrent * t;
1152
1153    managerLock( manager );
1154
1155    t = getExistingTorrent( manager, torrentHash );
1156    assert( t != NULL );
1157    stopTorrent( t );
1158    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1159    torrentDestructor( t );
1160
1161    managerUnlock( manager );
1162}
1163
1164void
1165tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1166                               const uint8_t    * torrentHash,
1167                               int8_t           * tab,
1168                               int                tabCount )
1169{
1170    int i;
1171    const Torrent * t;
1172    const tr_torrent * tor;
1173    float interval;
1174
1175    managerLock( (tr_peerMgr*)manager );
1176
1177    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1178    tor = t->tor;
1179    interval = tor->info.pieceCount / (float)tabCount;
1180
1181    memset( tab, 0, tabCount );
1182
1183    for( i=0; i<tabCount; ++i )
1184    {
1185        const int piece = i * interval;
1186
1187        if( tor == NULL )
1188            tab[i] = 0;
1189        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1190            tab[i] = -1;
1191        else {
1192            int j, peerCount;
1193            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1194            for( j=0; j<peerCount; ++j )
1195                if( tr_bitfieldHas( peers[j]->have, i ) )
1196                    ++tab[i];
1197        }
1198    }
1199
1200    managerUnlock( (tr_peerMgr*)manager );
1201}
1202
1203/* Returns the pieces that we and/or a connected peer has */
1204tr_bitfield*
1205tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1206                        const uint8_t    * torrentHash )
1207{
1208    int i, size;
1209    const Torrent * t;
1210    const tr_peer ** peers;
1211    tr_bitfield * pieces;
1212
1213    managerLock( (tr_peerMgr*)manager );
1214
1215    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1216    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1217    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1218    for( i=0; i<size; ++i )
1219        if( peers[i]->io != NULL )
1220            tr_bitfieldAnd( pieces, peers[i]->have );
1221
1222    managerUnlock( (tr_peerMgr*)manager );
1223    return pieces;
1224}
1225
1226void
1227tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1228                        const uint8_t    * torrentHash,
1229                        int              * setmePeersKnown,
1230                        int              * setmePeersConnected,
1231                        int              * setmePeersSendingToUs,
1232                        int              * setmePeersGettingFromUs,
1233                        int              * setmePeersFrom )
1234{
1235    int i, size;
1236    const Torrent * t;
1237    const tr_peer ** peers;
1238
1239    managerLock( (tr_peerMgr*)manager );
1240
1241    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1242    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1243
1244    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1245    *setmePeersConnected      = 0;
1246    *setmePeersSendingToUs    = 0;
1247    *setmePeersGettingFromUs  = 0;
1248
1249    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1250        setmePeersFrom[i] = 0;
1251
1252    for( i=0; i<size; ++i )
1253    {
1254        const tr_peer * peer = peers[i];
1255        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1256
1257        if( peer->io == NULL ) /* not connected */
1258            continue;
1259
1260        ++*setmePeersConnected;
1261
1262        ++setmePeersFrom[atom->from];
1263
1264        if( tr_rcRate( peer->rateToPeer ) > 0.01 )
1265            ++*setmePeersGettingFromUs;
1266
1267        if( tr_rcRate( peer->rateToClient ) > 0.01 )
1268            ++*setmePeersSendingToUs;
1269    }
1270
1271    managerUnlock( (tr_peerMgr*)manager );
1272}
1273
1274struct tr_peer_stat *
1275tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1276                     const uint8_t     * torrentHash,
1277                     int               * setmeCount UNUSED )
1278{
1279    int i, size;
1280    const Torrent * t;
1281    const tr_peer ** peers;
1282    tr_peer_stat * ret;
1283
1284    assert( manager != NULL );
1285    managerLock( (tr_peerMgr*)manager );
1286
1287    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1288    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1289
1290    ret = tr_new0( tr_peer_stat, size );
1291
1292    for( i=0; i<size; ++i )
1293    {
1294        const tr_peer * peer = peers[i];
1295        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1296        tr_peer_stat * stat = ret + i;
1297
1298        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1299        stat->port             = peer->port;
1300        stat->from             = atom->from;
1301        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1302        stat->progress         = peer->progress;
1303        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1304        stat->uploadToRate     = tr_rcRate( peer->rateToPeer );
1305        stat->downloadFromRate = tr_rcRate( peer->rateToClient );
1306        stat->isDownloading    = stat->uploadToRate > 0.01;
1307        stat->isUploading      = stat->downloadFromRate > 0.01;
1308    }
1309
1310    *setmeCount = size;
1311
1312    managerUnlock( (tr_peerMgr*)manager );
1313    return ret;
1314}
1315
1316/**
1317***
1318**/
1319
1320typedef struct
1321{
1322    tr_peer * peer;
1323    float rate;
1324    int randomKey;
1325    int preferred;
1326    int doUnchoke;
1327}
1328ChokeData;
1329
1330static int
1331compareChoke( const void * va, const void * vb )
1332{
1333    const ChokeData * a = va;
1334    const ChokeData * b = vb;
1335
1336    if( a->preferred != b->preferred )
1337        return a->preferred ? -1 : 1;
1338
1339    if( a->preferred )
1340    {
1341        if( a->rate > b->rate ) return -1;
1342        if( a->rate < b->rate ) return 1;
1343        return 0;
1344    }
1345    else
1346    {
1347        return a->randomKey - b->randomKey;
1348    }
1349}
1350
1351static int
1352clientIsSnubbedBy( const tr_peer * peer )
1353{
1354    assert( peer != NULL );
1355
1356    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1357}
1358
1359/**
1360***
1361**/
1362
1363static void
1364rechokeLeech( Torrent * t )
1365{
1366    int i, peerCount, size=0, unchoked=0;
1367    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1368    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1369    ChokeData * choke = tr_new0( ChokeData, peerCount );
1370
1371    assert( torrentIsLocked( t ) );
1372   
1373    /* sort the peers by preference and rate */
1374    for( i=0; i<peerCount; ++i )
1375    {
1376        tr_peer * peer = peers[i];
1377        ChokeData * node;
1378        if( peer->chokeChangedAt > ignorePeersNewerThan )
1379            continue;
1380
1381        node = &choke[size++];
1382        node->peer = peer;
1383        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1384        node->randomKey = tr_rand( INT_MAX );
1385        node->rate = (3*tr_rcRate(peer->rateToPeer))
1386                   + (1*tr_rcRate(peer->rateToClient));
1387    }
1388
1389    qsort( choke, size, sizeof(ChokeData), compareChoke );
1390
1391    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1392        choke[i].doUnchoke = 1;
1393        ++unchoked;
1394    }
1395
1396    for( ; i<size; ++i ) {
1397        choke[i].doUnchoke = 1;
1398        ++unchoked;
1399        if( choke[i].peer->peerIsInterested )
1400            break;
1401    }
1402
1403    for( i=0; i<size; ++i )
1404        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1405
1406    /* cleanup */
1407    tr_free( choke );
1408    tr_free( peers );
1409}
1410
1411static void
1412rechokeSeed( Torrent * t )
1413{
1414    int i, size;
1415    tr_peer ** peers;
1416
1417    assert( torrentIsLocked( t ) );
1418
1419    peers = getConnectedPeers( t, &size );
1420
1421    /* FIXME */
1422    for( i=0; i<size; ++i )
1423        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1424
1425    tr_free( peers );
1426}
1427
1428static int
1429rechokePulse( void * vtorrent )
1430{
1431    Torrent * t = vtorrent;
1432    torrentLock( t );
1433
1434    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1435    if( done )
1436        rechokeLeech( vtorrent );
1437    else
1438        rechokeSeed( vtorrent );
1439
1440    torrentUnlock( t );
1441    return TRUE;
1442}
1443
1444/***
1445****
1446****
1447****
1448***/
1449
1450struct tr_connection
1451{
1452    tr_peer * peer;
1453    double throughput;
1454};
1455
1456#define LAISSEZ_FAIRE_PERIOD_SECS 90
1457
1458static int
1459compareConnections( const void * va, const void * vb )
1460{
1461    const struct tr_connection * a = va;
1462    const struct tr_connection * b = vb;
1463    if( a->throughput < b->throughput ) return -1;
1464    if( a->throughput > b->throughput ) return 1;
1465    return 0;
1466}
1467
1468static struct tr_connection *
1469getWeakConnections( Torrent * t, int * setmeSize )
1470{
1471    int i, insize, outsize;
1472    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1473    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1474    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1475    const time_t now = time( NULL );
1476
1477    assert( torrentIsLocked( t ) );
1478
1479    for( i=outsize=0; i<insize; ++i )
1480    {
1481        tr_peer * peer = peers[i];
1482        int isWeak;
1483        const int peerIsSeed = peer->progress >= 1.0;
1484        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1485        const double throughput = (3*tr_rcRate(peer->rateToPeer))
1486                                + (1*tr_rcRate(peer->rateToClient));
1487
1488        assert( atom != NULL );
1489
1490        if( throughput >= 3 )
1491            isWeak = FALSE;
1492        else if( peer->doPurge )
1493            isWeak = TRUE;
1494        else if( peerIsSeed && clientIsSeed )
1495            isWeak = t->tor->pexDisabled || (now-atom->time>=30);
1496        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1497            isWeak = FALSE;
1498        else
1499            isWeak = now - peer->pieceDataActivityDate > 180;
1500
1501        if( isWeak )
1502        {
1503            ret[outsize].peer = peer;
1504            ret[outsize].throughput = throughput;
1505            ++outsize;
1506        }
1507    }
1508
1509    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1510    *setmeSize = outsize;
1511    return ret;
1512}
1513
1514static int
1515compareAtomByTime( const void * va, const void * vb )
1516{
1517    const struct peer_atom * a = * (const struct peer_atom**) va;
1518    const struct peer_atom * b = * (const struct peer_atom**) vb;
1519    if( a->time < b->time ) return -1;
1520    if( a->time > b->time ) return 1;
1521    return 0;
1522}
1523
1524static struct peer_atom **
1525getPeerCandidates( Torrent * t, int * setmeSize )
1526{
1527    int i, insize, outsize;
1528    struct peer_atom ** atoms;
1529    struct peer_atom ** ret;
1530    const time_t now = time( NULL );
1531    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1532
1533    assert( torrentIsLocked( t ) );
1534
1535    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1536    ret = tr_new( struct peer_atom*, insize );
1537    for( i=outsize=0; i<insize; ++i )
1538    {
1539        struct peer_atom * atom = atoms[i];
1540
1541        /* we don't need two connections to the same peer... */
1542        if( peerIsInUse( t, &atom->addr ) ) {
1543            tordbg( t, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1544            continue;
1545        }
1546
1547        /* no need to connect if we're both seeds... */
1548        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1549            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1550            continue;
1551        }
1552
1553        /* if we used this peer recently, give someone else a turn */
1554        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1555            tordbg( t, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1556            continue;
1557        }
1558
1559        ret[outsize++] = atom;
1560    }
1561
1562    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1563    *setmeSize = outsize;
1564    return ret;
1565}
1566
1567static int
1568reconnectPulse( void * vtorrent )
1569{
1570    Torrent * t = vtorrent;
1571
1572    torrentLock( t );
1573
1574    if( !t->isRunning )
1575    {
1576        removeAllPeers( t );
1577    }
1578    else
1579    {
1580        int i, nCandidates, nWeak, nAdd;
1581        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1582        struct tr_connection * connections = getWeakConnections( t, &nWeak );
1583        const int peerCount = tr_ptrArraySize( t->peers );
1584
1585        tordbg( t, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d\n",
1586                 t->tor->info.name, nWeak, nCandidates, tr_ptrArraySize(t->pool), (int)MAX_RECONNECTIONS_PER_PULSE );
1587
1588        for( i=0; i<nWeak; ++i )
1589            tordbg( t, "connection #%d: %s @ %.2f\n", i+1,
1590                     tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1591
1592        /* disconnect some peers */
1593        for( i=0; i<nWeak; ++i )
1594            removePeer( t, connections[i].peer );
1595
1596        /* add some new ones */
1597        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1598        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1599        {
1600            tr_peerMgr * mgr = t->manager;
1601
1602            struct peer_atom * atom = candidates[i];
1603
1604            tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1605
1606            tr_handshake * handshake = tr_handshakeNew( io,
1607                                                        mgr->handle->encryptionMode,
1608                                                        myHandshakeDoneCB,
1609                                                        mgr );
1610            ++mgr->connectionCount;
1611
1612            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1613
1614            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1615
1616            atom->time = time( NULL );
1617        }
1618
1619        /* cleanup */
1620        tr_free( connections );
1621        tr_free( candidates );
1622    }
1623
1624    torrentUnlock( t );
1625    return TRUE;
1626}
Note: See TracBrowser for help on using the repository browser.