source: trunk/libtransmission/peer-mgr.c @ 3309

Last change on this file since 3309 was 3309, checked in by charles, 15 years ago

try to connect to peers faster when a torrent is initially started.

  • Property svn:keywords set to Date Rev Author Id
File size: 43.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3309 2007-10-07 04:14:58Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include <sys/types.h> /* event.h needs this */
20#include <event.h>
21
22#include "transmission.h"
23#include "clients.h"
24#include "completion.h"
25#include "crypto.h"
26#include "handshake.h"
27#include "net.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "platform.h"
33#include "ptrarray.h"
34#include "ratecontrol.h"
35#include "shared.h"
36#include "trevent.h"
37#include "utils.h"
38
39enum
40{
41    /* how frequently to change which peers are choked */
42    RECHOKE_PERIOD_MSEC = (1000),
43
44    /* how frequently to decide which peers live and die */
45    RECONNECT_PERIOD_MSEC = (5 * 1000),
46
47    /* how frequently to refill peers' request lists */
48    REFILL_PERIOD_MSEC = 666,
49
50    /* don't change a peer's choke status more often than this */
51    MIN_CHOKE_PERIOD_SEC = 10,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* this is arbitrary and, hopefully, temporary until we come up
58     * with a better idea for managing the connection limits */
59    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
60
61    /* how many peers to unchoke per-torrent. */
62    /* FIXME: make this user-configurable? */
63    NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
64
65    /* set this too high and there will be a lot of churn.
66     * set it too low and you'll get peers too slowly */
67    MAX_RECONNECTIONS_PER_PULSE = 8,
68
69    /* corresponds to ut_pex's added.f flags */
70    ADDED_F_ENCRYPTION_FLAG = 1,
71
72    /* corresponds to ut_pex's added.f flags */
73    ADDED_F_SEED_FLAG = 2
74};
75
76
77/**
78***
79**/
80
81/* We keep one of these for every peer we know about, whether
82 * it's connected or not, so the struct must be small.
83 * When our current connections underperform, we dip back
84 * into this list for new ones. */
85struct peer_atom
86{   
87    uint8_t from;
88    uint8_t flags; /* these match the added_f flags */
89    uint16_t port;
90    struct in_addr addr; 
91    time_t time;
92};
93
94typedef struct
95{
96    uint8_t hash[SHA_DIGEST_LENGTH];
97    tr_ptrArray * outgoingHandshakes; /* tr_handshake */
98    tr_ptrArray * pool; /* struct peer_atom */
99    tr_ptrArray * peers; /* tr_peer */
100    tr_timer * reconnectTimer;
101    tr_timer * rechokeTimer;
102    tr_timer * refillTimer;
103    tr_torrent * tor;
104    tr_bitfield * requested;
105
106    unsigned int isRunning : 1;
107
108    struct tr_peerMgr * manager;
109}
110Torrent;
111
112struct tr_peerMgr
113{
114    int connectionCount;
115    tr_handle * handle;
116    tr_ptrArray * torrents; /* Torrent */
117    tr_ptrArray * incomingHandshakes; /* tr_handshake */
118};
119
120/**
121***
122**/
123
124static void
125myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
126{
127    FILE * fp = tr_getLog( );
128    if( fp != NULL )
129    {
130        va_list args;
131        struct evbuffer * buf = evbuffer_new( );
132        char timestr[64];
133        evbuffer_add_printf( buf, "[%s] %s: ",
134                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
135                             t->tor->info.name );
136        va_start( args, fmt );
137        evbuffer_add_vprintf( buf, fmt, args );
138        va_end( args );
139        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
140
141        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
142        evbuffer_free( buf );
143    }
144}
145
146#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
147
148/**
149***
150**/
151
152static void
153managerLock( struct tr_peerMgr * manager )
154{
155    tr_globalLock( manager->handle );
156}
157static void
158managerUnlock( struct tr_peerMgr * manager )
159{
160    tr_globalUnlock( manager->handle );
161}
162static void
163torrentLock( Torrent * torrent )
164{
165    managerLock( torrent->manager );
166}
167static void
168torrentUnlock( Torrent * torrent )
169{
170    managerUnlock( torrent->manager );
171}
172static int
173torrentIsLocked( const Torrent * t )
174{
175    return tr_globalIsLocked( t->manager->handle );
176}
177
178/**
179***
180**/
181
182static int
183compareAddresses( const struct in_addr * a, const struct in_addr * b )
184{
185    return tr_compareUint32( a->s_addr, b->s_addr );
186}
187
188static int
189handshakeCompareToAddr( const void * va, const void * vb )
190{
191    const tr_handshake * a = va;
192    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
193}
194
195static int
196handshakeCompare( const void * a, const void * b )
197{
198    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
199}
200
201static tr_handshake*
202getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
203{
204    return tr_ptrArrayFindSorted( handshakes,
205                                  in_addr,
206                                  handshakeCompareToAddr );
207}
208
209static int
210comparePeerAtomToAddress( const void * va, const void * vb )
211{
212    const struct peer_atom * a = va;
213    return compareAddresses( &a->addr, vb );
214}
215
216static int
217comparePeerAtoms( const void * va, const void * vb )
218{
219    const struct peer_atom * b = vb;
220    return comparePeerAtomToAddress( va, &b->addr );
221}
222
223/**
224***
225**/
226
227static int
228torrentCompare( const void * va, const void * vb )
229{
230    const Torrent * a = va;
231    const Torrent * b = vb;
232    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
233}
234
235static int
236torrentCompareToHash( const void * va, const void * vb )
237{
238    const Torrent * a = va;
239    const uint8_t * b_hash = vb;
240    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
241}
242
243static Torrent*
244getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
245{
246    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
247                                             hash,
248                                             torrentCompareToHash );
249}
250
251static int
252peerCompare( const void * va, const void * vb )
253{
254    const tr_peer * a = va;
255    const tr_peer * b = vb;
256    return compareAddresses( &a->in_addr, &b->in_addr );
257}
258
259static int
260peerCompareToAddr( const void * va, const void * vb )
261{
262    const tr_peer * a = va;
263    return compareAddresses( &a->in_addr, vb );
264}
265
266static tr_peer*
267getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
268{
269    assert( torrentIsLocked( torrent ) );
270    assert( in_addr != NULL );
271
272    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
273                                             in_addr,
274                                             peerCompareToAddr );
275}
276
277static struct peer_atom*
278getExistingAtom( const Torrent * t, const struct in_addr * addr )
279{
280    assert( torrentIsLocked( t ) );
281    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
282}
283
284static int
285peerIsKnown( const Torrent * t, const struct in_addr * addr )
286{
287    return getExistingAtom( t, addr ) != NULL;
288}
289
290static int
291peerIsInUse( const Torrent * ct, const struct in_addr * addr )
292{
293    Torrent * t = (Torrent*) ct;
294
295    assert( torrentIsLocked ( t ) );
296
297    return getExistingPeer( t, addr )
298        || getExistingHandshake( t->outgoingHandshakes, addr )
299        || getExistingHandshake( t->manager->incomingHandshakes, addr );
300}
301
302static tr_peer*
303peerConstructor( const struct in_addr * in_addr )
304{
305    tr_peer * p;
306    p = tr_new0( tr_peer, 1 );
307    p->rcToClient = tr_rcInit( );
308    p->rcToPeer = tr_rcInit( );
309    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
310    return p;
311}
312
313static tr_peer*
314getPeer( Torrent * torrent, const struct in_addr * in_addr )
315{
316    tr_peer * peer;
317
318    assert( torrentIsLocked( torrent ) );
319
320    peer = getExistingPeer( torrent, in_addr );
321
322    if( peer == NULL ) {
323        peer = peerConstructor( in_addr );
324        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
325    }
326
327    return peer;
328}
329
330static void
331peerDestructor( tr_peer * peer )
332{
333    assert( peer != NULL );
334    assert( peer->msgs != NULL );
335
336    tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
337    tr_peerMsgsFree( peer->msgs );
338
339    tr_peerIoFree( peer->io );
340
341    tr_bitfieldFree( peer->have );
342    tr_bitfieldFree( peer->blame );
343    tr_bitfieldFree( peer->banned );
344    tr_rcClose( peer->rcToClient );
345    tr_rcClose( peer->rcToPeer );
346    tr_free( peer->client );
347    tr_free( peer );
348}
349
350static void
351removePeer( Torrent * t, tr_peer * peer )
352{
353    tr_peer * removed;
354    struct peer_atom * atom;
355
356    assert( torrentIsLocked( t ) );
357
358    atom = getExistingAtom( t, &peer->in_addr );
359    assert( atom != NULL );
360    atom->time = time( NULL );
361
362    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
363    assert( removed == peer );
364    peerDestructor( removed );
365}
366
367static void
368removeAllPeers( Torrent * t )
369{
370    while( !tr_ptrArrayEmpty( t->peers ) )
371        removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
372}
373
374static void
375torrentDestructor( Torrent * t )
376{
377    uint8_t hash[SHA_DIGEST_LENGTH];
378
379    assert( t != NULL );
380    assert( !t->isRunning );
381    assert( t->peers != NULL );
382    assert( torrentIsLocked( t ) );
383    assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
384    assert( tr_ptrArrayEmpty( t->peers ) );
385
386    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
387
388    tr_timerFree( &t->reconnectTimer );
389    tr_timerFree( &t->rechokeTimer );
390    tr_timerFree( &t->refillTimer );
391
392    tr_bitfieldFree( t->requested );
393    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
394    tr_ptrArrayFree( t->outgoingHandshakes, NULL );
395    tr_ptrArrayFree( t->peers, NULL );
396
397    tr_free( t );
398}
399
400static Torrent*
401torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
402{
403    Torrent * t;
404
405    t = tr_new0( Torrent, 1 );
406    t->manager = manager;
407    t->tor = tor;
408    t->pool = tr_ptrArrayNew( );
409    t->peers = tr_ptrArrayNew( );
410    t->outgoingHandshakes = tr_ptrArrayNew( );
411    t->requested = tr_bitfieldNew( tor->blockCount );
412    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
413
414    return t;
415}
416
417/**
418***
419**/
420
421struct tr_bitfield *
422tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
423                              const uint32_t         pieceCount,
424                              const uint8_t          infohash[20],
425                              const struct in_addr * ip )
426{
427    /* This has been checked against the spec example implementation. Feeding it with :
428    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
429generate :
430    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
431    but since we're storing in a bitfield, it won't be in this order... */
432    /* TODO : We should translate link-local IPv4 adresses to external IP,
433     * so that being on same local network gives us the same allowed pieces */
434   
435    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
436    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
437            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
438            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
439            inet_ntoa( *ip ) );
440   
441    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
442    char buf[4];
443    uint32_t allowedPieceCount = 0;
444    tr_bitfield_t * ret;
445   
446    ret = tr_bitfieldNew( pieceCount );
447   
448    /* We need a seed based on most significant bytes of peer address
449        concatenated with torrent's infohash */
450    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
451   
452    memcpy( seed, &buf, 4 );
453    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
454   
455    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
456   
457    while ( allowedPieceCount < setCount )
458    {
459        int i;
460        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
461        {
462            /* We generate indices from 4-byte chunks of the seed */
463            uint32_t j = i * 4;
464            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
465            uint32_t index = y % pieceCount;
466           
467            if ( !tr_bitfieldHas( ret, index ) )
468            {
469                tr_bitfieldAdd( ret, index );
470                allowedPieceCount++;
471            }
472        }
473        /* We randomize the seed, in case we need to iterate more */
474        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
475    }
476    tr_free( seed );
477   
478    return ret;
479}
480
481tr_peerMgr*
482tr_peerMgrNew( tr_handle * handle )
483{
484    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
485    m->handle = handle;
486    m->torrents = tr_ptrArrayNew( );
487    m->incomingHandshakes = tr_ptrArrayNew( );
488    return m;
489}
490
491void
492tr_peerMgrFree( tr_peerMgr * manager )
493{
494    managerLock( manager );
495
496    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
497     * the item from manager->handshakes, so this is a little roundabout... */
498    while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
499        tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
500    tr_ptrArrayFree( manager->incomingHandshakes, NULL );
501
502    /* free the torrents. */
503    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
504
505    managerUnlock( manager );
506    tr_free( manager );
507}
508
509static tr_peer**
510getConnectedPeers( Torrent * t, int * setmeCount )
511{
512    int i, peerCount, connectionCount;
513    tr_peer **peers;
514    tr_peer **ret;
515
516    assert( torrentIsLocked( t ) );
517
518    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
519    ret = tr_new( tr_peer*, peerCount );
520
521    for( i=connectionCount=0; i<peerCount; ++i )
522        if( peers[i]->msgs != NULL )
523            ret[connectionCount++] = peers[i];
524
525    *setmeCount = connectionCount;
526    return ret;
527}
528
529/***
530****  Refill
531***/
532
533struct tr_refill_piece
534{
535    tr_priority_t priority;
536    uint16_t random;
537    uint32_t piece;
538    uint32_t peerCount;
539    uint32_t fastAllowed;
540};
541
542static int
543compareRefillPiece (const void * aIn, const void * bIn)
544{
545    const struct tr_refill_piece * a = aIn;
546    const struct tr_refill_piece * b = bIn;
547
548    /* if one piece has a higher priority, it goes first */
549    if (a->priority != b->priority)
550        return a->priority > b->priority ? -1 : 1;
551
552    /* otherwise if one has fewer peers, it goes first */
553    if (a->peerCount != b->peerCount)
554        return a->peerCount < b->peerCount ? -1 : 1;
555   
556    /* otherwise if one *might be* fastallowed to us */
557    if (a->fastAllowed != b->fastAllowed)
558        return a->fastAllowed < b->fastAllowed ? -1 : 1;
559
560    /* otherwise go with our random seed */
561    return tr_compareUint16( a->random, b->random );
562}
563
564static int
565isPieceInteresting( const tr_torrent  * tor,
566                    int                 piece )
567{
568    if( tor->info.pieces[piece].dnd ) /* we don't want it */
569        return 0;
570
571    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
572        return 0;
573
574    return 1;
575}
576
577static uint32_t*
578getPreferredPieces( Torrent     * t,
579                    uint32_t    * pieceCount )
580{
581    const tr_torrent * tor = t->tor;
582    const tr_info * inf = &tor->info;
583    int i;
584    uint32_t poolSize = 0;
585    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
586    int peerCount;
587    tr_peer** peers;
588
589    assert( torrentIsLocked( t ) );
590
591    peers = getConnectedPeers( t, &peerCount );
592
593    for( i=0; i<inf->pieceCount; ++i )
594        if( isPieceInteresting( tor, i ) )
595            pool[poolSize++] = i;
596
597    /* sort the pool from most interesting to least... */
598    if( poolSize > 1 )
599    {
600        uint32_t j;
601        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
602
603        for( j=0; j<poolSize; ++j )
604        {
605            int k;
606            const int piece = pool[j];
607            struct tr_refill_piece * setme = p + j;
608
609            setme->piece = piece;
610            setme->priority = inf->pieces[piece].priority;
611            setme->peerCount = 0;
612            setme->fastAllowed = 0;
613            setme->random = tr_rand( UINT16_MAX );
614            /* FIXME */
615//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
616
617            for( k=0; k<peerCount; ++k ) {
618                const tr_peer * peer = peers[k];
619                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
620                    ++setme->peerCount;
621            }
622        }
623
624        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
625
626        for( j=0; j<poolSize; ++j )
627            pool[j] = p[j].piece;
628
629        tr_free( p );
630    }
631
632    tr_free( peers );
633
634    *pieceCount = poolSize;
635    return pool;
636}
637
638static uint64_t*
639getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
640{
641    uint32_t i;
642    uint32_t pieceCount;
643    uint32_t * pieces;
644    uint64_t *req, *unreq, *ret, *walk;
645    int reqCount, unreqCount;
646    const tr_torrent * tor = t->tor;
647
648    assert( torrentIsLocked( t ) );
649
650    pieces = getPreferredPieces( t, &pieceCount );
651
652    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
653    reqCount = 0;
654    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
655    unreqCount = 0;
656
657    for( i=0; i<pieceCount; ++i ) {
658        const uint32_t index = pieces[i];
659        const int begin = tr_torPieceFirstBlock( tor, index );
660        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
661        int block;
662        for( block=begin; block<end; ++block )
663            if( tr_cpBlockIsComplete( tor->completion, block ) )
664                continue;
665            else if( tr_bitfieldHas( t->requested, block ) )
666                req[reqCount++] = block;
667            else
668                unreq[unreqCount++] = block;
669    }
670
671    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
672    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
673    walk += unreqCount;
674    memcpy( walk, req, sizeof(uint64_t) * reqCount );
675    walk += reqCount;
676    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
677    *setmeCount = walk - ret;
678
679    tr_free( req );
680    tr_free( unreq );
681    tr_free( pieces );
682
683    return ret;
684}
685
686static int
687refillPulse( void * vtorrent )
688{
689    Torrent * t = vtorrent;
690    tr_torrent * tor = t->tor;
691    uint32_t i;
692    int peerCount;
693    tr_peer ** peers;
694    uint64_t blockCount;
695    uint64_t * blocks;
696
697    if( !t->isRunning )
698        return TRUE;
699    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
700        return TRUE;
701
702    torrentLock( t );
703    tordbg( t, "Refilling Request Buffers..." );
704
705    blocks = getPreferredBlocks( t, &blockCount );
706    peers = getConnectedPeers( t, &peerCount );
707
708    for( i=0; peerCount && i<blockCount; ++i )
709    {
710        const int block = blocks[i];
711        const uint32_t index = tr_torBlockPiece( tor, block );
712        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
713        const uint32_t length = tr_torBlockCountBytes( tor, block );
714        int j;
715        assert( _tr_block( tor, index, begin ) == block );
716        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
717        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
718
719
720        /* find a peer who can ask for this block */
721        for( j=0; j<peerCount; )
722        {
723            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
724            switch( val )
725            {
726                case TR_ADDREQ_FULL: 
727                case TR_ADDREQ_CLIENT_CHOKED:
728                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
729                    break;
730
731                case TR_ADDREQ_MISSING: 
732                case TR_ADDREQ_DUPLICATE: 
733                    ++j;
734                    break;
735
736                case TR_ADDREQ_OK:
737                    tr_bitfieldAdd( t->requested, block );
738                    j = peerCount;
739                    break;
740
741                default:
742                    assert( 0 && "unhandled value" );
743                    break;
744            }
745        }
746    }
747
748    /* cleanup */
749    tr_free( peers );
750    tr_free( blocks );
751
752    t->refillTimer = NULL;
753    torrentUnlock( t );
754    return FALSE;
755}
756
757static void
758broadcastClientHave( Torrent * t, uint32_t index )
759{
760    int i, size;
761    tr_peer ** peers;
762
763    assert( torrentIsLocked( t ) );
764
765    peers = getConnectedPeers( t, &size );
766    for( i=0; i<size; ++i )
767        tr_peerMsgsHave( peers[i]->msgs, index );
768    tr_free( peers );
769}
770
771static void
772broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
773{
774    int i, size;
775    tr_peer ** peers;
776
777    assert( torrentIsLocked( t ) );
778
779    peers = getConnectedPeers( t, &size );
780    for( i=0; i<size; ++i )
781        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
782    tr_free( peers );
783}
784
785static void
786msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
787{
788    tr_peer * peer = vpeer;
789    Torrent * t = (Torrent *) vt;
790    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
791
792    torrentLock( t );
793
794    switch( e->eventType )
795    {
796        case TR_PEERMSG_NEED_REQ:
797            if( t->refillTimer == NULL )
798                t->refillTimer = tr_timerNew( t->manager->handle,
799                                              refillPulse, t,
800                                              REFILL_PERIOD_MSEC );
801            break;
802
803        case TR_PEERMSG_CANCEL:
804            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
805            break;
806
807        case TR_PEERMSG_CLIENT_HAVE:
808            broadcastClientHave( t, e->pieceIndex );
809            tr_torrentRecheckCompleteness( t->tor );
810            break;
811
812        case TR_PEERMSG_PEER_PROGRESS: {
813            struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
814            const int peerIsSeed = e->progress >= 1.0;
815            if( peerIsSeed ) {
816                tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
817                atom->flags |= ADDED_F_SEED_FLAG;
818            } else {
819                tordbg( t, "marking peer %s as a non-seed", tr_peerIoAddrStr(&atom->addr,atom->port) );
820                atom->flags &= ~ADDED_F_SEED_FLAG;
821            } break;
822        }
823
824        case TR_PEERMSG_CLIENT_BLOCK:
825            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
826            break;
827
828        case TR_PEERMSG_GOT_ERROR:
829            peer->doPurge = 1;
830            break;
831
832        default:
833            assert(0);
834    }
835
836    torrentUnlock( t );
837}
838
839static void
840ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
841{
842    if( !peerIsKnown( t, addr ) )
843    {
844        struct peer_atom * a = tr_new( struct peer_atom, 1 );
845        a->addr = *addr;
846        a->port = port;
847        a->flags = flags;
848        a->from = from;
849        a->time = 0;
850        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
851        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
852    }
853}
854
855/* FIXME: this is kind of a mess. */
856static void
857myHandshakeDoneCB( tr_handshake    * handshake,
858                   tr_peerIo       * io,
859                   int               isConnected,
860                   const uint8_t   * peer_id,
861                   void            * vmanager )
862{
863    int ok = isConnected;
864    uint16_t port;
865    const struct in_addr * in_addr;
866    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
867    Torrent * t;
868    tr_handshake * ours;
869
870    assert( io != NULL );
871    assert( isConnected==0 || isConnected==1 );
872
873    t = tr_peerIoHasTorrentHash( io )
874        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
875        : NULL;
876
877    if( tr_peerIoIsIncoming ( io ) )
878        ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
879                                        handshake, handshakeCompare );
880    else if( t != NULL )
881        ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
882                                        handshake, handshakeCompare );
883    else
884        ours = handshake;
885
886    assert( ours != NULL );
887    assert( ours == handshake );
888
889    if( t != NULL )
890        torrentLock( t );
891
892    in_addr = tr_peerIoGetAddress( io, &port );
893
894    if( !t || !t->isRunning )
895    {
896        tr_peerIoFree( io );
897        --manager->connectionCount;
898    }
899    else if( !ok )
900    {
901        /* if we couldn't connect or were snubbed,
902         * the peer's probably not worth remembering. */
903        tr_peer * peer = getExistingPeer( t, in_addr );
904        tr_peerIoFree( io );
905        --manager->connectionCount;
906        if( peer )
907            peer->doPurge = 1;
908    }
909    else /* looking good */
910    {
911        tr_peer * peer = getPeer( t, in_addr );
912        uint16_t port;
913        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
914        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
915        tr_free( peer->client );
916        peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
917        if( peer->msgs != NULL ) { /* we already have this peer */
918            tr_peerIoFree( io );
919            --manager->connectionCount;
920        } else {
921            peer->port = port;
922            peer->io = io;
923            peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
924        }
925    }
926
927    if( t != NULL )
928        torrentUnlock( t );
929}
930
931void
932tr_peerMgrAddIncoming( tr_peerMgr      * manager,
933                       struct in_addr  * addr,
934                       uint16_t          port,
935                       int               socket )
936{
937    managerLock( manager );
938
939    if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL )
940    {
941        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
942
943        tr_handshake * handshake = tr_handshakeNew( io,
944                                                    manager->handle->encryptionMode,
945                                                    myHandshakeDoneCB,
946                                                    manager );
947        ++manager->connectionCount;
948
949        tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
950    }
951
952    managerUnlock( manager );
953}
954
955void
956tr_peerMgrAddPex( tr_peerMgr     * manager,
957                  const uint8_t  * torrentHash,
958                  int              from,
959                  const tr_pex   * pex,
960                  int              pexCount )
961{
962    Torrent * t;
963    const tr_pex * end;
964
965    managerLock( manager );
966
967    t = getExistingTorrent( manager, torrentHash );
968    for( end=pex+pexCount; pex!=end; ++pex )
969        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
970
971    managerUnlock( manager );
972}
973
974void
975tr_peerMgrAddPeers( tr_peerMgr    * manager,
976                    const uint8_t * torrentHash,
977                    int             from,
978                    const uint8_t * peerCompact,
979                    int             peerCount )
980{
981    int i;
982    const uint8_t * walk = peerCompact;
983    Torrent * t;
984
985    managerLock( manager );
986
987    t = getExistingTorrent( manager, torrentHash );
988    for( i=0; t!=NULL && i<peerCount; ++i )
989    {
990        struct in_addr addr;
991        uint16_t port;
992        memcpy( &addr, walk, 4 ); walk += 4;
993        memcpy( &port, walk, 2 ); walk += 2;
994        ensureAtomExists( t, &addr, port, 0, from );
995    }
996
997    managerUnlock( manager );
998}
999
1000/**
1001***
1002**/
1003
1004void
1005tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1006                    const uint8_t  * torrentHash UNUSED,
1007                    int              pieceIndex UNUSED,
1008                    int              success UNUSED )
1009{
1010    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1011}
1012
1013int
1014tr_pexCompare( const void * va, const void * vb )
1015{
1016    const tr_pex * a = (const tr_pex *) va;
1017    const tr_pex * b = (const tr_pex *) vb;
1018    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1019    if( i ) return i;
1020    if( a->port < b->port ) return -1;
1021    if( a->port > b->port ) return 1;
1022    return 0;
1023}
1024
1025int tr_pexCompare( const void * a, const void * b );
1026
1027static int
1028peerPrefersCrypto( const tr_peer * peer )
1029{
1030    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1031        return TRUE;
1032
1033    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1034        return FALSE;
1035
1036    return tr_peerIoIsEncrypted( peer->io );
1037};
1038
1039int
1040tr_peerMgrGetPeers( tr_peerMgr      * manager,
1041                    const uint8_t   * torrentHash,
1042                    tr_pex         ** setme_pex )
1043{
1044    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1045    int i, peerCount;
1046    const tr_peer ** peers;
1047    tr_pex * pex;
1048    tr_pex * walk;
1049
1050    torrentLock( (Torrent*)t );
1051
1052    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1053    pex = walk = tr_new( tr_pex, peerCount );
1054
1055    for( i=0; i<peerCount; ++i, ++walk )
1056    {
1057        const tr_peer * peer = peers[i];
1058
1059        walk->in_addr = peer->in_addr;
1060
1061        walk->port = peer->port;
1062
1063        walk->flags = 0;
1064        if( peerPrefersCrypto(peer) )  walk->flags |= ADDED_F_ENCRYPTION_FLAG;
1065        if( peer->progress >= 1.0 )    walk->flags |= ADDED_F_SEED_FLAG;
1066    }
1067
1068    assert( ( walk - pex ) == peerCount );
1069    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1070    *setme_pex = pex;
1071
1072    torrentUnlock( (Torrent*)t );
1073
1074    return peerCount;
1075}
1076
1077static int reconnectPulse( void * vtorrent );
1078static int rechokePulse( void * vtorrent );
1079
1080void
1081tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1082                        const uint8_t  * torrentHash )
1083{
1084    Torrent * t;
1085
1086    managerLock( manager );
1087
1088    t = getExistingTorrent( manager, torrentHash );
1089
1090    assert( t != NULL );
1091    assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
1092    assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
1093
1094    if( !t->isRunning )
1095    {
1096        t->isRunning = 1;
1097
1098        t->reconnectTimer = tr_timerNew( t->manager->handle,
1099                                         reconnectPulse, t,
1100                                         RECONNECT_PERIOD_MSEC );
1101
1102        t->rechokeTimer = tr_timerNew( t->manager->handle,
1103                                       rechokePulse, t,
1104                                       RECHOKE_PERIOD_MSEC );
1105
1106        reconnectPulse( t );
1107
1108        rechokePulse( t );
1109    }
1110
1111    managerUnlock( manager );
1112}
1113
1114static void
1115stopTorrent( Torrent * t )
1116{
1117    assert( torrentIsLocked( t ) );
1118
1119    t->isRunning = 0;
1120    tr_timerFree( &t->rechokeTimer );
1121    tr_timerFree( &t->reconnectTimer );
1122
1123    /* disconnect the peers. */
1124    tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)peerDestructor );
1125    tr_ptrArrayClear( t->peers );
1126
1127    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1128     * which removes the handshake from t->outgoingHandshakes... */
1129    while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
1130        tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
1131}
1132void
1133tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1134                       const uint8_t  * torrentHash)
1135{
1136    managerLock( manager );
1137
1138    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1139
1140    managerUnlock( manager );
1141}
1142
1143void
1144tr_peerMgrAddTorrent( tr_peerMgr * manager,
1145                      tr_torrent * tor )
1146{
1147    Torrent * t;
1148
1149    managerLock( manager );
1150
1151    assert( tor != NULL );
1152    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1153
1154    t = torrentConstructor( manager, tor );
1155    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1156
1157    managerUnlock( manager );
1158}
1159
1160void
1161tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1162                         const uint8_t  * torrentHash )
1163{
1164    Torrent * t;
1165
1166    managerLock( manager );
1167
1168    t = getExistingTorrent( manager, torrentHash );
1169    assert( t != NULL );
1170    stopTorrent( t );
1171    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
1172    torrentDestructor( t );
1173
1174    managerUnlock( manager );
1175}
1176
1177void
1178tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1179                               const uint8_t    * torrentHash,
1180                               int8_t           * tab,
1181                               int                tabCount )
1182{
1183    int i;
1184    const Torrent * t;
1185    const tr_torrent * tor;
1186    float interval;
1187
1188    managerLock( (tr_peerMgr*)manager );
1189
1190    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1191    tor = t->tor;
1192    interval = tor->info.pieceCount / (float)tabCount;
1193
1194    memset( tab, 0, tabCount );
1195
1196    for( i=0; i<tabCount; ++i )
1197    {
1198        const int piece = i * interval;
1199
1200        if( tor == NULL )
1201            tab[i] = 0;
1202        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1203            tab[i] = -1;
1204        else {
1205            int j, peerCount;
1206            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1207            for( j=0; j<peerCount; ++j )
1208                if( tr_bitfieldHas( peers[j]->have, i ) )
1209                    ++tab[i];
1210        }
1211    }
1212
1213    managerUnlock( (tr_peerMgr*)manager );
1214}
1215
1216/* Returns the pieces that we and/or a connected peer has */
1217tr_bitfield*
1218tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1219                        const uint8_t    * torrentHash )
1220{
1221    int i, size;
1222    const Torrent * t;
1223    const tr_peer ** peers;
1224    tr_bitfield * pieces;
1225
1226    managerLock( (tr_peerMgr*)manager );
1227
1228    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1229    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1230    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1231    for( i=0; i<size; ++i )
1232        if( peers[i]->io != NULL )
1233            tr_bitfieldAnd( pieces, peers[i]->have );
1234
1235    managerUnlock( (tr_peerMgr*)manager );
1236    return pieces;
1237}
1238
1239void
1240tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1241                        const uint8_t    * torrentHash,
1242                        int              * setmePeersKnown,
1243                        int              * setmePeersConnected,
1244                        int              * setmePeersSendingToUs,
1245                        int              * setmePeersGettingFromUs,
1246                        int              * setmePeersFrom )
1247{
1248    int i, size;
1249    const Torrent * t;
1250    const tr_peer ** peers;
1251
1252    managerLock( (tr_peerMgr*)manager );
1253
1254    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1255    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1256
1257    *setmePeersKnown          = tr_ptrArraySize( t->pool );
1258    *setmePeersConnected      = 0;
1259    *setmePeersSendingToUs    = 0;
1260    *setmePeersGettingFromUs  = 0;
1261
1262    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1263        setmePeersFrom[i] = 0;
1264
1265    for( i=0; i<size; ++i )
1266    {
1267        const tr_peer * peer = peers[i];
1268        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1269
1270        if( peer->io == NULL ) /* not connected */
1271            continue;
1272
1273        ++*setmePeersConnected;
1274
1275        ++setmePeersFrom[atom->from];
1276
1277        if( peer->rateToPeer > 0.01 )
1278            ++*setmePeersGettingFromUs;
1279
1280        if( peer->rateToClient > 0.01 )
1281            ++*setmePeersSendingToUs;
1282    }
1283
1284    managerUnlock( (tr_peerMgr*)manager );
1285}
1286
1287struct tr_peer_stat *
1288tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1289                     const uint8_t     * torrentHash,
1290                     int               * setmeCount UNUSED )
1291{
1292    int i, size;
1293    const Torrent * t;
1294    const tr_peer ** peers;
1295    tr_peer_stat * ret;
1296
1297    assert( manager != NULL );
1298    managerLock( (tr_peerMgr*)manager );
1299
1300    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1301    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1302
1303    ret = tr_new0( tr_peer_stat, size );
1304
1305    for( i=0; i<size; ++i )
1306    {
1307        const tr_peer * peer = peers[i];
1308        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1309        tr_peer_stat * stat = ret + i;
1310
1311        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1312        stat->port             = peer->port;
1313        stat->from             = atom->from;
1314        stat->client           = tr_strdup( peer->client ? peer->client : "" );
1315        stat->progress         = peer->progress;
1316        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1317        stat->uploadToRate     = peer->rateToPeer;
1318        stat->downloadFromRate = peer->rateToClient;
1319        stat->isDownloading    = stat->uploadToRate > 0.01;
1320        stat->isUploading      = stat->downloadFromRate > 0.01;
1321    }
1322
1323    *setmeCount = size;
1324
1325    managerUnlock( (tr_peerMgr*)manager );
1326    return ret;
1327}
1328
1329/**
1330***
1331**/
1332
1333struct ChokeData
1334{
1335    tr_peer * peer;
1336    float rate;
1337    int randomKey;
1338    int preferred;
1339    int doUnchoke;
1340};
1341
1342static int
1343compareChoke( const void * va, const void * vb )
1344{
1345    const struct ChokeData * a = va;
1346    const struct ChokeData * b = vb;
1347
1348    if( a->preferred != b->preferred )
1349        return a->preferred ? -1 : 1;
1350
1351    if( a->preferred )
1352    {
1353        if( a->rate > b->rate ) return -1;
1354        if( a->rate < b->rate ) return 1;
1355        return 0;
1356    }
1357    else
1358    {
1359        return a->randomKey - b->randomKey;
1360    }
1361}
1362
1363static int
1364clientIsSnubbedBy( const tr_peer * peer )
1365{
1366    assert( peer != NULL );
1367
1368    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1369}
1370
1371/**
1372***
1373**/
1374
1375static double
1376getWeightedThroughput( const tr_peer * peer )
1377{
1378    return ( 3 * peer->rateToPeer )
1379         + ( 1 * peer->rateToClient );
1380}
1381
1382static void
1383rechoke( Torrent * t )
1384{
1385    int i, peerCount, size=0, unchoked=0;
1386    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1387    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1388    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
1389
1390    assert( torrentIsLocked( t ) );
1391   
1392    /* sort the peers by preference and rate */
1393    for( i=0; i<peerCount; ++i )
1394    {
1395        tr_peer * peer = peers[i];
1396        struct ChokeData * node;
1397        if( peer->chokeChangedAt > ignorePeersNewerThan )
1398            continue;
1399
1400        node = &choke[size++];
1401        node->peer = peer;
1402        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1403        node->randomKey = tr_rand( INT_MAX );
1404        node->rate = getWeightedThroughput( peer );
1405    }
1406
1407    qsort( choke, size, sizeof(struct ChokeData), compareChoke );
1408
1409    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1410        choke[i].doUnchoke = 1;
1411        ++unchoked;
1412    }
1413
1414    for( ; i<size; ++i ) {
1415        choke[i].doUnchoke = 1;
1416        ++unchoked;
1417        if( choke[i].peer->peerIsInterested )
1418            break;
1419    }
1420
1421    for( i=0; i<size; ++i )
1422        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1423
1424    /* cleanup */
1425    tr_free( choke );
1426    tr_free( peers );
1427}
1428
1429static int
1430rechokePulse( void * vtorrent )
1431{
1432    Torrent * t = vtorrent;
1433    torrentLock( t );
1434    rechoke( t );
1435    torrentUnlock( t );
1436    return TRUE;
1437}
1438
1439/***
1440****
1441****
1442****
1443***/
1444
1445#define LAISSEZ_FAIRE_PERIOD_SECS 90
1446
1447static tr_peer **
1448getWeakConnections( Torrent * t, int * setmeSize )
1449{
1450    int i, insize, outsize;
1451    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1452    struct tr_peer ** ret = tr_new( tr_peer*, insize );
1453    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1454    const time_t now = time( NULL );
1455
1456    assert( torrentIsLocked( t ) );
1457
1458    for( i=outsize=0; i<insize; ++i )
1459    {
1460        tr_peer * peer = peers[i];
1461        int isWeak;
1462        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1463        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
1464        const double throughput = getWeightedThroughput( peer );
1465
1466        assert( atom != NULL );
1467
1468        if( throughput >= 3 )
1469            isWeak = FALSE;
1470        else if( peer->doPurge )
1471            isWeak = TRUE;
1472        else if( peerIsSeed && clientIsSeed )
1473            isWeak = t->tor->pexDisabled || (now-atom->time>=30);
1474        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1475            isWeak = FALSE;
1476        else
1477            isWeak = ( now - peer->pieceDataActivityDate ) > 180;
1478
1479        if( isWeak )
1480            ret[outsize++] = peer;
1481    }
1482
1483    *setmeSize = outsize;
1484    return ret;
1485}
1486
1487static int
1488compareAtomByTime( const void * va, const void * vb )
1489{
1490    const struct peer_atom * a = * (const struct peer_atom**) va;
1491    const struct peer_atom * b = * (const struct peer_atom**) vb;
1492    if( a->time < b->time ) return -1;
1493    if( a->time > b->time ) return 1;
1494    return 0;
1495}
1496
1497static struct peer_atom **
1498getPeerCandidates( Torrent * t, int * setmeSize )
1499{
1500    int i, insize, outsize;
1501    struct peer_atom ** atoms;
1502    struct peer_atom ** ret;
1503    const time_t now = time( NULL );
1504    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1505
1506    assert( torrentIsLocked( t ) );
1507
1508    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1509    ret = tr_new( struct peer_atom*, insize );
1510    for( i=outsize=0; i<insize; ++i )
1511    {
1512        struct peer_atom * atom = atoms[i];
1513
1514        /* we don't need two connections to the same peer... */
1515        if( peerIsInUse( t, &atom->addr ) ) {
1516            tordbg( t, "RECONNECT peer %d (%s) is in use..",
1517                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1518            continue;
1519        }
1520
1521        /* no need to connect if we're both seeds... */
1522        if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
1523            tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we..",
1524                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1525            continue;
1526        }
1527
1528        /* if we used this peer recently, give someone else a turn */
1529        if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS ) {
1530            tordbg( t, "RECONNECT peer %d (%s) is in its grace period..",
1531                    i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1532            continue;
1533        }
1534
1535        ret[outsize++] = atom;
1536    }
1537
1538    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1539    *setmeSize = outsize;
1540    return ret;
1541}
1542
1543static int
1544reconnectPulse( void * vtorrent )
1545{
1546    Torrent * t = vtorrent;
1547
1548    torrentLock( t );
1549
1550    if( !t->isRunning )
1551    {
1552        removeAllPeers( t );
1553    }
1554    else
1555    {
1556        int i, nCandidates, nWeak, nAdd;
1557        struct peer_atom ** candidates = getPeerCandidates( t, &nCandidates );
1558        struct tr_peer ** connections = getWeakConnections( t, &nWeak );
1559        const int peerCount = tr_ptrArraySize( t->peers );
1560
1561        tordbg( t, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d",
1562                 t->tor->info.name, nWeak, nCandidates, tr_ptrArraySize(t->pool), (int)MAX_RECONNECTIONS_PER_PULSE );
1563
1564        /* disconnect some peers */
1565        for( i=0; i<nWeak; ++i )
1566            removePeer( t, connections[i] );
1567
1568        /* add some new ones */
1569        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1570        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
1571        //for( i=0; i<nCandidates; ++i )
1572        {
1573            tr_peerMgr * mgr = t->manager;
1574
1575            struct peer_atom * atom = candidates[i];
1576
1577            tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
1578
1579            tr_handshake * handshake = tr_handshakeNew( io,
1580                                                        mgr->handle->encryptionMode,
1581                                                        myHandshakeDoneCB,
1582                                                        mgr );
1583            ++mgr->connectionCount;
1584
1585            assert( tr_peerIoGetTorrentHash( io ) != NULL );
1586
1587            tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
1588
1589            atom->time = time( NULL );
1590        }
1591
1592        /* cleanup */
1593        tr_free( connections );
1594        tr_free( candidates );
1595    }
1596
1597    torrentUnlock( t );
1598    return TRUE;
1599}
Note: See TracBrowser for help on using the repository browser.