source: trunk/libtransmission/peer-mgr.c @ 3254

Last change on this file since 3254 was 3254, checked in by charles, 15 years ago

simplify libT locks now that it's (more-or-less) single-threaded. fix deadlocks. make tr_locks nestable.

  • Property svn:keywords set to Date Rev Author Id
File size: 43.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3254 2007-10-01 15:17:15Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "shared.h"
33#include "trevent.h"
34#include "utils.h"
35
36enum
37{
38    /* how frequently to change which peers are choked */
39    RECHOKE_PERIOD_MSEC = (15 * 1000),
40
41    /* how frequently to decide which peers live and die */
42    RECONNECT_PERIOD_MSEC = (15 * 1000),
43
44    /* how frequently to refill peers' request lists */
45    REFILL_PERIOD_MSEC = 1000,
46
47    /* how many peers to unchoke per-torrent. */
48    /* FIXME: make this user-configurable? */
49    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
50
51    /* don't change a peer's choke status more often than this */
52    MIN_CHOKE_PERIOD_SEC = 10,
53
54    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
55    SOON_MSEC = 1000,
56
57    /* following the BT spec, we consider ourselves `snubbed' if
58     * we're we don't get piece data from a peer in this long */
59    SNUBBED_SEC = 60,
60
61    /* this is arbitrary and, hopefully, temporary until we come up
62     * with a better idea for managing the connection limits */
63    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
64};
65
66/**
67***
68**/
69
70/* We keep one of these for every peer we know about, whether
71 * it's connected or not, so the struct must be small.
72 * When our current connections underperform, we dip back
73 * int this list for new ones. */
74struct peer_atom
75{   
76    uint8_t from;
77    uint8_t flags; /* these match the added_f flags */
78    uint16_t port;
79    struct in_addr addr; 
80    time_t time;
81};
82
83typedef struct
84{
85    uint8_t hash[SHA_DIGEST_LENGTH];
86    tr_ptrArray * pool; /* struct peer_atom */
87    tr_ptrArray * peers; /* tr_peer */
88    tr_timer * reconnectTimer;
89    tr_timer * reconnectSoonTimer;
90    tr_timer * rechokeTimer;
91    tr_timer * rechokeSoonTimer;
92    tr_timer * refillTimer;
93    tr_torrent * tor;
94    tr_bitfield * requested;
95
96    unsigned int isRunning : 1;
97
98    struct tr_peerMgr * manager;
99}
100Torrent;
101
102struct tr_peerMgr
103{
104    tr_handle * handle;
105    tr_ptrArray * torrents; /* Torrent */
106    int connectionCount;
107    tr_ptrArray * handshakes; /* in-process */
108};
109
110/**
111***
112**/
113
114static void
115managerLock( struct tr_peerMgr * manager )
116{
117    tr_globalLock( manager->handle );
118}
119static void
120managerUnlock( struct tr_peerMgr * manager )
121{
122    tr_globalUnlock( manager->handle );
123}
124static void
125torrentLock( Torrent * torrent )
126{
127    managerLock( torrent->manager );
128}
129static void
130torrentUnlock( Torrent * torrent )
131{
132    managerUnlock( torrent->manager );
133}
134static int
135torrentIsLocked( const Torrent * t )
136{
137    return ( t != NULL ) 
138        && ( tr_globalIsLocked( t->manager->handle ) );
139}
140
141/**
142***
143**/
144
145static int
146compareAddresses( const struct in_addr * a, const struct in_addr * b )
147{
148    return tr_compareUint32( a->s_addr, b->s_addr );
149}
150
151static int
152handshakeCompareToAddr( const void * va, const void * vb )
153{
154    const tr_handshake * a = va;
155    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
156}
157
158static int
159handshakeCompare( const void * a, const void * b )
160{
161    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
162}
163
164static tr_handshake*
165getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
166{
167    return tr_ptrArrayFindSorted( mgr->handshakes,
168                                  in_addr,
169                                  handshakeCompareToAddr );
170}
171
172static int
173comparePeerAtomToAddress( const void * va, const void * vb )
174{
175    const struct peer_atom * a = va;
176    return compareAddresses( &a->addr, vb );
177}
178
179static int
180comparePeerAtoms( const void * va, const void * vb )
181{
182    const struct peer_atom * b = vb;
183    return comparePeerAtomToAddress( va, &b->addr );
184}
185
186/**
187***
188**/
189
190static int
191torrentCompare( const void * va, const void * vb )
192{
193    const Torrent * a = va;
194    const Torrent * b = vb;
195    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
196}
197
198static int
199torrentCompareToHash( const void * va, const void * vb )
200{
201    const Torrent * a = (const Torrent*) va;
202    const uint8_t * b_hash = (const uint8_t*) vb;
203    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
204}
205
206static Torrent*
207getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
208{
209    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
210                                             hash,
211                                             torrentCompareToHash );
212}
213
214static int
215peerCompare( const void * va, const void * vb )
216{
217    const tr_peer * a = (const tr_peer *) va;
218    const tr_peer * b = (const tr_peer *) vb;
219    return compareAddresses( &a->in_addr, &b->in_addr );
220}
221
222static int
223peerCompareToAddr( const void * va, const void * vb )
224{
225    const tr_peer * a = (const tr_peer *) va;
226    return compareAddresses( &a->in_addr, vb );
227}
228
229static tr_peer*
230getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
231{
232    assert( torrentIsLocked( torrent ) );
233    assert( in_addr != NULL );
234
235    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
236                                             in_addr,
237                                             peerCompareToAddr );
238}
239
240static struct peer_atom*
241getExistingAtom( const Torrent * t, const struct in_addr * addr )
242{
243    assert( torrentIsLocked( t ) );
244    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
245}
246
247static int
248peerIsKnown( const Torrent * t, const struct in_addr * addr )
249{
250    return getExistingAtom( t, addr ) != NULL;
251}
252
253static int
254peerIsInUse( const Torrent * t, const struct in_addr * addr )
255{
256    assert( torrentIsLocked ( t ) );
257
258    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
259        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
260}
261
262static tr_peer*
263getPeer( Torrent * torrent, const struct in_addr * in_addr )
264{
265    tr_peer * peer;
266
267    assert( torrentIsLocked( torrent ) );
268
269    peer = getExistingPeer( torrent, in_addr );
270
271    if( peer == NULL )
272    {
273        peer = tr_new0( tr_peer, 1 );
274        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
275        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
276    }
277
278    return peer;
279}
280
281static void
282disconnectPeer( tr_peer * peer )
283{
284    assert( peer != NULL );
285
286    tr_peerIoFree( peer->io );
287    peer->io = NULL;
288
289    if( peer->msgs != NULL )
290    {
291        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
292        tr_peerMsgsFree( peer->msgs );
293        peer->msgs = NULL;
294    }
295
296    tr_bitfieldFree( peer->have );
297    peer->have = NULL;
298
299    tr_bitfieldFree( peer->blame );
300    peer->blame = NULL;
301
302    tr_bitfieldFree( peer->banned );
303    peer->banned = NULL;
304}
305
306static void
307freePeer( tr_peer * peer )
308{
309    disconnectPeer( peer );
310    tr_free( peer->client );
311    tr_free( peer );
312}
313
314static void
315removePeer( Torrent * t, tr_peer * peer )
316{
317    tr_peer * removed;
318    struct peer_atom * atom;
319
320    assert( torrentIsLocked( t ) );
321
322    atom = getExistingAtom( t, &peer->in_addr );
323    assert( atom != NULL );
324    atom->time = time( NULL );
325
326    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
327    assert( removed == peer );
328    freePeer( removed );
329}
330
331static void
332freeTorrent( tr_peerMgr * manager, Torrent * t )
333{
334    uint8_t hash[SHA_DIGEST_LENGTH];
335
336    assert( t != NULL );
337    assert( t->peers != NULL );
338    assert( torrentIsLocked( t ) );
339    assert( getExistingTorrent( manager, t->hash ) != NULL );
340
341    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
342
343    tr_timerFree( &t->reconnectTimer );
344    tr_timerFree( &t->reconnectSoonTimer );
345    tr_timerFree( &t->rechokeTimer );
346    tr_timerFree( &t->rechokeSoonTimer );
347    tr_timerFree( &t->refillTimer );
348
349    tr_bitfieldFree( t->requested );
350    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
351    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
352    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
353    tr_free( t );
354
355    assert( getExistingTorrent( manager, hash ) == NULL );
356}
357
358/**
359***
360**/
361
362struct tr_bitfield *
363tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
364                              const uint32_t         pieceCount,
365                              const uint8_t          infohash[20],
366                              const struct in_addr * ip )
367{
368    /* This has been checked against the spec example implementation. Feeding it with :
369    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
370generate :
371    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
372    but since we're storing in a bitfield, it won't be in this order... */
373    /* TODO : We should translate link-local IPv4 adresses to external IP,
374     * so that being on same local network gives us the same allowed pieces */
375   
376    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
377    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
378            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
379            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
380            inet_ntoa( *ip ) );
381   
382    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
383    char buf[4];
384    uint32_t allowedPieceCount = 0;
385    tr_bitfield_t * ret;
386   
387    ret = tr_bitfieldNew( pieceCount );
388   
389    /* We need a seed based on most significant bytes of peer address
390        concatenated with torrent's infohash */
391    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
392   
393    memcpy( seed, &buf, 4 );
394    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
395   
396    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
397   
398    while ( allowedPieceCount < setCount )
399    {
400        int i;
401        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
402        {
403            /* We generate indices from 4-byte chunks of the seed */
404            uint32_t j = i * 4;
405            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
406            uint32_t index = y % pieceCount;
407           
408            if ( !tr_bitfieldHas( ret, index ) )
409            {
410                tr_bitfieldAdd( ret, index );
411                allowedPieceCount++;
412            }
413        }
414        /* We randomize the seed, in case we need to iterate more */
415        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
416    }
417    tr_free( seed );
418   
419    return ret;
420}
421
422tr_peerMgr*
423tr_peerMgrNew( tr_handle * handle )
424{
425    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
426    m->handle = handle;
427    m->torrents = tr_ptrArrayNew( );
428    m->handshakes = tr_ptrArrayNew( );
429    return m;
430}
431
432void
433tr_peerMgrFree( tr_peerMgr * manager )
434{
435    managerLock( manager );
436
437    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
438    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
439
440    managerUnlock( manager );
441    tr_free( manager );
442}
443
444static tr_peer**
445getConnectedPeers( Torrent * t, int * setmeCount )
446{
447    int i, peerCount, connectionCount;
448    tr_peer **peers;
449    tr_peer **ret;
450
451    assert( torrentIsLocked( t ) );
452
453    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
454    ret = tr_new( tr_peer*, peerCount );
455
456    for( i=connectionCount=0; i<peerCount; ++i )
457        if( peers[i]->msgs != NULL )
458            ret[connectionCount++] = peers[i];
459
460    *setmeCount = connectionCount;
461    return ret;
462}
463
464/***
465****  Refill
466***/
467
468struct tr_refill_piece
469{
470    tr_priority_t priority;
471    uint32_t piece;
472    uint32_t peerCount;
473    uint32_t fastAllowed;
474    uint8_t random;
475};
476
477static int
478compareRefillPiece (const void * aIn, const void * bIn)
479{
480    const struct tr_refill_piece * a = aIn;
481    const struct tr_refill_piece * b = bIn;
482
483    /* if one piece has a higher priority, it goes first */
484    if (a->priority != b->priority)
485        return a->priority > b->priority ? -1 : 1;
486
487    /* otherwise if one has fewer peers, it goes first */
488    if (a->peerCount != b->peerCount)
489        return a->peerCount < b->peerCount ? -1 : 1;
490   
491    /* otherwise if one *might be* fastallowed to us */
492    if (a->fastAllowed != b->fastAllowed)
493        return a->fastAllowed < b->fastAllowed ? -1 : 1;
494
495    /* otherwise go with our random seed */
496    return tr_compareUint8( a->random, b->random );
497}
498
499static int
500isPieceInteresting( const tr_torrent  * tor,
501                    int                 piece )
502{
503    if( tor->info.pieces[piece].dnd ) /* we don't want it */
504        return 0;
505
506    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
507        return 0;
508
509    return 1;
510}
511
512static uint32_t*
513getPreferredPieces( Torrent     * t,
514                    uint32_t    * pieceCount )
515{
516    const tr_torrent * tor = t->tor;
517    const tr_info * inf = &tor->info;
518    int i;
519    uint32_t poolSize = 0;
520    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
521    int peerCount;
522    tr_peer** peers;
523
524    assert( torrentIsLocked( t ) );
525
526    peers = getConnectedPeers( t, &peerCount );
527
528    for( i=0; i<inf->pieceCount; ++i )
529        if( isPieceInteresting( tor, i ) )
530            pool[poolSize++] = i;
531
532    /* sort the pool from most interesting to least... */
533    if( poolSize > 1 )
534    {
535        uint32_t j;
536        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
537
538        for( j=0; j<poolSize; ++j )
539        {
540            int k;
541            const int piece = pool[j];
542            struct tr_refill_piece * setme = p + j;
543
544            setme->piece = piece;
545            setme->priority = inf->pieces[piece].priority;
546            setme->peerCount = 0;
547            setme->fastAllowed = 0;
548            setme->random = tr_rand( UINT8_MAX );
549            /* FIXME */
550//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
551
552            for( k=0; k<peerCount; ++k ) {
553                const tr_peer * peer = peers[k];
554                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
555                    ++setme->peerCount;
556            }
557        }
558
559        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
560
561        for( j=0; j<poolSize; ++j )
562            pool[j] = p[j].piece;
563
564        tr_free( p );
565    }
566
567#if 0
568fprintf (stderr, "new pool: ");
569for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
570fprintf (stderr, "\n");
571#endif
572    tr_free( peers );
573
574    *pieceCount = poolSize;
575    return pool;
576}
577
578static uint64_t*
579getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
580{
581    uint32_t i;
582    uint32_t pieceCount;
583    uint32_t * pieces;
584    uint64_t *req, *unreq, *ret, *walk;
585    int reqCount, unreqCount;
586    const tr_torrent * tor = t->tor;
587
588    assert( torrentIsLocked( t ) );
589
590    pieces = getPreferredPieces( t, &pieceCount );
591/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
592
593    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
594    reqCount = 0;
595    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
596    unreqCount = 0;
597
598    for( i=0; i<pieceCount; ++i ) {
599        const uint32_t index = pieces[i];
600        const int begin = tr_torPieceFirstBlock( tor, index );
601        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
602        int block;
603        for( block=begin; block<end; ++block )
604            if( tr_cpBlockIsComplete( tor->completion, block ) )
605                continue;
606            else if( tr_bitfieldHas( t->requested, block ) )
607                req[reqCount++] = block;
608            else
609                unreq[unreqCount++] = block;
610    }
611
612/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
613    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
614    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
615    walk += unreqCount;
616    memcpy( walk, req, sizeof(uint64_t) * reqCount );
617    walk += reqCount;
618    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
619    *setmeCount = walk - ret;
620
621    tr_free( req );
622    tr_free( unreq );
623    tr_free( pieces );
624
625    return ret;
626}
627
628static int
629refillPulse( void * vtorrent )
630{
631    Torrent * t = vtorrent;
632    tr_torrent * tor = t->tor;
633    uint32_t i;
634    int peerCount;
635    tr_peer ** peers;
636    uint64_t blockCount;
637    uint64_t * blocks;
638
639    if( !t->isRunning )
640        return TRUE;
641    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
642        return TRUE;
643
644    torrentLock( t );
645
646    blocks = getPreferredBlocks( t, &blockCount );
647    peers = getConnectedPeers( t, &peerCount );
648
649/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
650
651    for( i=0; peerCount && i<blockCount; ++i )
652    {
653        const int block = blocks[i];
654        const uint32_t index = tr_torBlockPiece( tor, block );
655        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
656        const uint32_t length = tr_torBlockCountBytes( tor, block );
657        int j;
658        assert( _tr_block( tor, index, begin ) == block );
659        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
660        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
661
662
663        /* find a peer who can ask for this block */
664        for( j=0; j<peerCount; )
665        {
666            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
667            switch( val )
668            {
669                case TR_ADDREQ_FULL: 
670                case TR_ADDREQ_CLIENT_CHOKED:
671                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
672                    break;
673
674                case TR_ADDREQ_MISSING: 
675                    ++j;
676                    break;
677
678                case TR_ADDREQ_OK:
679                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
680                    tr_bitfieldAdd( t->requested, block );
681                    j = peerCount;
682                    break;
683
684                default:
685                    assert( 0 && "unhandled value" );
686                    break;
687            }
688        }
689    }
690
691    /* cleanup */
692    tr_free( peers );
693    tr_free( blocks );
694
695    t->refillTimer = NULL;
696
697    torrentUnlock( t );
698    return FALSE;
699}
700
701static void
702broadcastClientHave( Torrent * t, uint32_t index )
703{
704    int i, size;
705    tr_peer ** peers;
706
707    assert( torrentIsLocked( t ) );
708
709    peers = getConnectedPeers( t, &size );
710    for( i=0; i<size; ++i )
711        tr_peerMsgsHave( peers[i]->msgs, index );
712    tr_free( peers );
713}
714
715static void
716broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
717{
718    int i, size;
719    tr_peer ** peers;
720
721    assert( torrentIsLocked( t ) );
722
723    peers = getConnectedPeers( t, &size );
724    for( i=0; i<size; ++i )
725        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
726    tr_free( peers );
727}
728
729/**
730***
731**/
732
733static int reconnectPulse( void * vtorrent );
734
735static void
736restartReconnectTimer( Torrent * t )
737{
738    tr_timerFree( &t->reconnectTimer );
739    if( t->isRunning )
740        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
741}
742
743static void
744reconnectNow( Torrent * t )
745{
746    reconnectPulse( t );
747    restartReconnectTimer( t );
748}
749
750static int
751reconnectSoonCB( void * vt )
752{
753    Torrent * t = vt;
754    reconnectNow( t );
755    t->reconnectSoonTimer = NULL;
756    return FALSE;
757}
758
759static void
760reconnectSoon( Torrent * t )
761{
762    if( t->reconnectSoonTimer == NULL )
763        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
764                                             reconnectSoonCB, t, SOON_MSEC );
765}
766
767/**
768***
769**/
770
771static int rechokePulse( void * vtorrent );
772
773static void
774restartChokeTimer( Torrent * t )
775{
776    tr_timerFree( &t->rechokeTimer );
777    if( t->isRunning )
778        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
779}
780
781static void
782rechokeNow( Torrent * t )
783{
784    rechokePulse( t );
785    restartChokeTimer( t );
786}
787
788static int
789rechokeSoonCB( void * vt )
790{
791    Torrent * t = vt;
792    rechokeNow( t );
793    t->rechokeSoonTimer = NULL;
794    return FALSE;
795}
796
797static void
798rechokeSoon( Torrent * t )
799{
800    if( t->rechokeSoonTimer == NULL )
801        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
802                                           rechokeSoonCB, t, SOON_MSEC );
803}
804
805static void
806msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
807{
808    tr_peer * peer = vpeer;
809    Torrent * t = (Torrent *) vt;
810    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
811
812    torrentLock( t );
813
814    switch( e->eventType )
815    {
816        case TR_PEERMSG_NEED_REQ:
817            if( t->refillTimer == NULL )
818                t->refillTimer = tr_timerNew( t->manager->handle,
819                                              refillPulse, t,
820                                              REFILL_PERIOD_MSEC );
821            break;
822
823        case TR_PEERMSG_CLIENT_HAVE:
824            broadcastClientHave( t, e->pieceIndex );
825            tr_torrentRecheckCompleteness( t->tor );
826            break;
827
828        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
829#if 0
830            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
831            const int peerIsSeed = e->progress >= 1.0;
832            if( clientIsSeed && peerIsSeed )
833                peer->doPurge = 1;
834#endif
835            break;
836        }
837
838        case TR_PEERMSG_CLIENT_BLOCK:
839            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
840            break;
841
842        case TR_PEERMSG_GOT_ERROR:
843            peer->doPurge = 1;
844            reconnectSoon( t );
845            break;
846
847        default:
848            assert(0);
849    }
850
851    torrentUnlock( t );
852}
853
854static void
855ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
856{
857    if( !peerIsKnown( t, addr ) )
858    {
859        struct peer_atom * a = tr_new( struct peer_atom, 1 );
860        a->addr = *addr;
861        a->port = port;
862        a->flags = flags;
863        a->from = from;
864        a->time = 0;
865fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
866        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
867    }
868}
869
870static void
871myHandshakeDoneCB( tr_handshake    * handshake,
872                   tr_peerIo       * io,
873                   int               isConnected,
874                   const uint8_t   * peer_id,
875                   void            * vmanager )
876{
877    int ok = isConnected;
878    uint16_t port;
879    const struct in_addr * in_addr;
880    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
881    const uint8_t * hash = NULL;
882    Torrent * t;
883    tr_handshake * ours;
884
885    assert( io != NULL );
886    assert( isConnected==0 || isConnected==1 );
887
888    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
889                                    handshake,
890                                    handshakeCompare );
891    assert( ours != NULL );
892    assert( ours == handshake );
893
894    in_addr = tr_peerIoGetAddress( io, &port );
895
896    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
897    {
898        tr_peerIoFree( io );
899        --manager->connectionCount;
900        return;
901    }
902
903    hash = tr_peerIoGetTorrentHash( io );
904    t = getExistingTorrent( manager, hash );
905
906    if( t != NULL )
907        torrentLock( t );
908
909    if( !t || !t->isRunning )
910    {
911        tr_peerIoFree( io );
912        --manager->connectionCount;
913    }
914    else if( !ok )
915    {
916        /* if we couldn't connect or were snubbed,
917         * the peer's probably not worth remembering. */
918        tr_peer * peer = getExistingPeer( t, in_addr );
919        tr_peerIoFree( io );
920        --manager->connectionCount;
921        if( peer )
922            peer->doPurge = 1;
923    }
924    else /* looking good */
925    {
926        tr_peer * peer = getPeer( t, in_addr );
927        uint16_t port;
928        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
929        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
930        if( peer->msgs != NULL ) { /* we already have this peer */
931            tr_peerIoFree( io );
932            --manager->connectionCount;
933        } else {
934            peer->port = port;
935            peer->io = io;
936            peer->msgs = tr_peerMsgsNew( t->tor, peer );
937            tr_free( peer->client );
938            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
939            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
940            rechokeSoon( t );
941        }
942    }
943
944    if( t != NULL )
945        torrentUnlock( t );
946}
947
948static void
949initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
950{
951    tr_handshake * handshake;
952
953    assert( io != NULL );
954
955    handshake = tr_handshakeNew( io,
956                                 manager->handle->encryptionMode,
957                                 myHandshakeDoneCB,
958                                 manager );
959    ++manager->connectionCount;
960
961    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
962}
963
964void
965tr_peerMgrAddIncoming( tr_peerMgr      * manager,
966                       struct in_addr  * addr,
967                       uint16_t          port,
968                       int               socket )
969{
970    managerLock( manager );
971
972    if( getExistingHandshake( manager, addr ) == NULL )
973    {
974        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
975        initiateHandshake( manager, io );
976    }
977
978    managerUnlock( manager );
979}
980
981void
982tr_peerMgrAddPex( tr_peerMgr     * manager,
983                  const uint8_t  * torrentHash,
984                  int              from,
985                  const tr_pex   * pex,
986                  int              pexCount )
987{
988    Torrent * t;
989    const tr_pex * end;
990
991    managerLock( manager );
992
993    t = getExistingTorrent( manager, torrentHash );
994    for( end=pex+pexCount; pex!=end; ++pex )
995        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
996    reconnectSoon( t );
997
998    managerUnlock( manager );
999}
1000
1001void
1002tr_peerMgrAddPeers( tr_peerMgr    * manager,
1003                    const uint8_t * torrentHash,
1004                    int             from,
1005                    const uint8_t * peerCompact,
1006                    int             peerCount )
1007{
1008    int i;
1009    const uint8_t * walk = peerCompact;
1010    Torrent * t;
1011
1012    managerLock( manager );
1013
1014    t = getExistingTorrent( manager, torrentHash );
1015    for( i=0; t!=NULL && i<peerCount; ++i )
1016    {
1017        struct in_addr addr;
1018        uint16_t port;
1019        memcpy( &addr, walk, 4 ); walk += 4;
1020        memcpy( &port, walk, 2 ); walk += 2;
1021        ensureAtomExists( t, &addr, port, 0, from );
1022    }
1023    reconnectSoon( t );
1024
1025    managerUnlock( manager );
1026}
1027
1028/**
1029***
1030**/
1031
1032int
1033tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1034{
1035    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1036}
1037
1038void
1039tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1040                    const uint8_t  * torrentHash UNUSED,
1041                    int              pieceIndex UNUSED,
1042                    int              success UNUSED )
1043{
1044    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1045}
1046
1047int
1048tr_pexCompare( const void * va, const void * vb )
1049{
1050    const tr_pex * a = (const tr_pex *) va;
1051    const tr_pex * b = (const tr_pex *) vb;
1052    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1053    if( i ) return i;
1054    if( a->port < b->port ) return -1;
1055    if( a->port > b->port ) return 1;
1056    return 0;
1057}
1058
1059int tr_pexCompare( const void * a, const void * b );
1060
1061static int
1062peerPrefersCrypto( const tr_peer * peer )
1063{
1064    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1065        return TRUE;
1066
1067    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1068        return FALSE;
1069
1070    return tr_peerIoIsEncrypted( peer->io );
1071};
1072
1073int
1074tr_peerMgrGetPeers( tr_peerMgr      * manager,
1075                    const uint8_t   * torrentHash,
1076                    tr_pex         ** setme_pex )
1077{
1078    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1079    int i, peerCount;
1080    const tr_peer ** peers;
1081    tr_pex * pex;
1082    tr_pex * walk;
1083
1084    torrentLock( (Torrent*)t );
1085
1086    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1087    pex = walk = tr_new( tr_pex, peerCount );
1088
1089    for( i=0; i<peerCount; ++i, ++walk )
1090    {
1091        const tr_peer * peer = peers[i];
1092
1093        walk->in_addr = peer->in_addr;
1094
1095        walk->port = peer->port;
1096
1097        walk->flags = 0;
1098        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1099        if( peer->progress >= 1.0 )    walk->flags |= 2;
1100    }
1101
1102    assert( ( walk - pex ) == peerCount );
1103    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1104    *setme_pex = pex;
1105
1106    torrentUnlock( (Torrent*)t );
1107
1108    return peerCount;
1109}
1110
1111void
1112tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1113                        const uint8_t  * torrentHash )
1114{
1115    Torrent * t;
1116
1117    managerLock( manager );
1118
1119    t = getExistingTorrent( manager, torrentHash );
1120    t->isRunning = 1;
1121    restartChokeTimer( t );
1122    reconnectSoon( t );
1123
1124    managerUnlock( manager );
1125}
1126
1127static void
1128stopTorrent( Torrent * t )
1129{
1130    int i, size;
1131    tr_peer ** peers;
1132
1133    assert( torrentIsLocked( t ) );
1134
1135    t->isRunning = 0;
1136    tr_timerFree( &t->rechokeTimer );
1137    tr_timerFree( &t->reconnectTimer );
1138
1139    peers = getConnectedPeers( t, &size );
1140    for( i=0; i<size; ++i )
1141        disconnectPeer( peers[i] );
1142
1143    tr_free( peers );
1144}
1145void
1146tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1147                       const uint8_t  * torrentHash)
1148{
1149    managerLock( manager );
1150
1151    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1152
1153    managerUnlock( manager );
1154}
1155
1156void
1157tr_peerMgrAddTorrent( tr_peerMgr * manager,
1158                      tr_torrent * tor )
1159{
1160    Torrent * t;
1161
1162    managerLock( manager );
1163
1164    assert( tor != NULL );
1165    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1166
1167    t = tr_new0( Torrent, 1 );
1168    t->manager = manager;
1169    t->tor = tor;
1170    t->pool = tr_ptrArrayNew( );
1171    t->peers = tr_ptrArrayNew( );
1172    t->requested = tr_bitfieldNew( tor->blockCount );
1173    restartChokeTimer( t );
1174    restartReconnectTimer( t );
1175
1176    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1177    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1178
1179    managerUnlock( manager );
1180}
1181
1182void
1183tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1184                         const uint8_t  * torrentHash )
1185{
1186    Torrent * t;
1187
1188    managerLock( manager );
1189
1190    t = getExistingTorrent( manager, torrentHash );
1191    assert( t != NULL );
1192    stopTorrent( t );
1193    freeTorrent( manager, t );
1194
1195    managerUnlock( manager );
1196}
1197
1198void
1199tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1200                               const uint8_t    * torrentHash,
1201                               int8_t           * tab,
1202                               int                tabCount )
1203{
1204    int i;
1205    const Torrent * t;
1206    const tr_torrent * tor;
1207    float interval;
1208
1209    managerLock( (tr_peerMgr*)manager );
1210
1211    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1212    tor = t->tor;
1213    interval = tor->info.pieceCount / (float)tabCount;
1214
1215    memset( tab, 0, tabCount );
1216
1217    for( i=0; i<tabCount; ++i )
1218    {
1219        const int piece = i * interval;
1220
1221        if( tor == NULL )
1222            tab[i] = 0;
1223        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1224            tab[i] = -1;
1225        else {
1226            int j, peerCount;
1227            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1228            for( j=0; j<peerCount; ++j )
1229                if( tr_bitfieldHas( peers[j]->have, i ) )
1230                    ++tab[i];
1231        }
1232    }
1233
1234    managerUnlock( (tr_peerMgr*)manager );
1235}
1236
1237/* Returns the pieces that we and/or a connected peer has */
1238tr_bitfield*
1239tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1240                        const uint8_t    * torrentHash )
1241{
1242    int i, size;
1243    const Torrent * t;
1244    const tr_peer ** peers;
1245    tr_bitfield * pieces;
1246
1247    managerLock( (tr_peerMgr*)manager );
1248
1249    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1250    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1251    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1252    for( i=0; i<size; ++i )
1253        if( peers[i]->io != NULL )
1254            tr_bitfieldAnd( pieces, peers[i]->have );
1255
1256    managerUnlock( (tr_peerMgr*)manager );
1257    return pieces;
1258}
1259
1260void
1261tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1262                        const uint8_t    * torrentHash,
1263                        int              * setmePeersTotal,
1264                        int              * setmePeersConnected,
1265                        int              * setmePeersSendingToUs,
1266                        int              * setmePeersGettingFromUs,
1267                        int              * setmePeersFrom )
1268{
1269    int i, size;
1270    const Torrent * t;
1271    const tr_peer ** peers;
1272
1273    managerLock( (tr_peerMgr*)manager );
1274
1275    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1276    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1277
1278    *setmePeersTotal          = size;
1279    *setmePeersConnected      = 0;
1280    *setmePeersSendingToUs    = 0;
1281    *setmePeersGettingFromUs  = 0;
1282
1283    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1284        setmePeersFrom[i] = 0;
1285
1286    for( i=0; i<size; ++i )
1287    {
1288        const tr_peer * peer = peers[i];
1289
1290        if( peer->io == NULL ) /* not connected */
1291            continue;
1292
1293        ++*setmePeersConnected;
1294
1295        ++setmePeersFrom[peer->from];
1296
1297        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1298            ++*setmePeersGettingFromUs;
1299
1300        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1301            ++*setmePeersSendingToUs;
1302    }
1303
1304    managerUnlock( (tr_peerMgr*)manager );
1305}
1306
1307struct tr_peer_stat *
1308tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1309                     const uint8_t     * torrentHash,
1310                     int               * setmeCount UNUSED )
1311{
1312    int i, size;
1313    const Torrent * t;
1314    const tr_peer ** peers;
1315    tr_peer_stat * ret;
1316
1317    assert( manager != NULL );
1318    managerLock( (tr_peerMgr*)manager );
1319
1320    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1321    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1322
1323    ret = tr_new0( tr_peer_stat, size );
1324
1325    for( i=0; i<size; ++i )
1326    {
1327        const tr_peer * peer = peers[i];
1328        const int live = peer->io != NULL;
1329        tr_peer_stat * stat = ret + i;
1330
1331        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1332        stat->port             = peer->port;
1333        stat->from             = peer->from;
1334        stat->client           = peer->client;
1335        stat->progress         = peer->progress;
1336        stat->isConnected      = live ? 1 : 0;
1337        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1338        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1339        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1340        stat->isDownloading    = stat->uploadToRate > 0.01;
1341        stat->isUploading      = stat->downloadFromRate > 0.01;
1342    }
1343
1344    *setmeCount = size;
1345
1346    managerUnlock( (tr_peerMgr*)manager );
1347    return ret;
1348}
1349
1350/**
1351***
1352**/
1353
1354typedef struct
1355{
1356    tr_peer * peer;
1357    float rate;
1358    int randomKey;
1359    int preferred;
1360    int doUnchoke;
1361}
1362ChokeData;
1363
1364static int
1365compareChoke( const void * va, const void * vb )
1366{
1367    const ChokeData * a = ( const ChokeData * ) va;
1368    const ChokeData * b = ( const ChokeData * ) vb;
1369
1370    if( a->preferred != b->preferred )
1371        return a->preferred ? -1 : 1;
1372
1373    if( a->preferred )
1374    {
1375        if( a->rate > b->rate ) return -1;
1376        if( a->rate < b->rate ) return 1;
1377        return 0;
1378    }
1379    else
1380    {
1381        return a->randomKey - b->randomKey;
1382    }
1383}
1384
1385static int
1386clientIsSnubbedBy( const tr_peer * peer )
1387{
1388    assert( peer != NULL );
1389
1390    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1391}
1392
1393/**
1394***
1395**/
1396
1397static void
1398rechokeLeech( Torrent * t )
1399{
1400    int i, peerCount, size=0, unchoked=0;
1401    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1402    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1403    ChokeData * choke = tr_new0( ChokeData, peerCount );
1404
1405    assert( torrentIsLocked( t ) );
1406   
1407    /* sort the peers by preference and rate */
1408    for( i=0; i<peerCount; ++i )
1409    {
1410        tr_peer * peer = peers[i];
1411        ChokeData * node;
1412        if( peer->chokeChangedAt > ignorePeersNewerThan )
1413            continue;
1414
1415        node = &choke[size++];
1416        node->peer = peer;
1417        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1418        node->randomKey = tr_rand( INT_MAX );
1419        node->rate = tr_peerIoGetRateToClient( peer->io );
1420    }
1421
1422    qsort( choke, size, sizeof(ChokeData), compareChoke );
1423
1424    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1425        choke[i].doUnchoke = 1;
1426        ++unchoked;
1427    }
1428
1429    for( ; i<size; ++i ) {
1430        choke[i].doUnchoke = 1;
1431        ++unchoked;
1432        if( choke[i].peer->peerIsInterested )
1433            break;
1434    }
1435
1436    for( i=0; i<size; ++i )
1437        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1438
1439    /* cleanup */
1440    tr_free( choke );
1441    tr_free( peers );
1442}
1443
1444static void
1445rechokeSeed( Torrent * t )
1446{
1447    int i, size;
1448    tr_peer ** peers;
1449
1450    assert( torrentIsLocked( t ) );
1451
1452    peers = getConnectedPeers( t, &size );
1453
1454    /* FIXME */
1455    for( i=0; i<size; ++i )
1456        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1457
1458    tr_free( peers );
1459}
1460
1461static int
1462rechokePulse( void * vtorrent )
1463{
1464    Torrent * t = vtorrent;
1465    torrentLock( t );
1466
1467    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1468    if( done )
1469        rechokeLeech( vtorrent );
1470    else
1471        rechokeSeed( vtorrent );
1472
1473    torrentUnlock( t );
1474    return TRUE;
1475}
1476
1477/***
1478****
1479****
1480****
1481***/
1482
1483struct tr_connection
1484{
1485    tr_peer * peer;
1486    double throughput;
1487};
1488
1489#define LAISSEZ_FAIRE_PERIOD_SECS 60
1490
1491static int
1492compareConnections( const void * va, const void * vb )
1493{
1494    const struct tr_connection * a = va;
1495    const struct tr_connection * b = vb;
1496    if( a->throughput < b->throughput ) return -1;
1497    if( a->throughput > b->throughput ) return 1;
1498    return 0;
1499}
1500
1501static struct tr_connection *
1502getWeakConnections( Torrent * t, int * setmeSize )
1503{
1504    int i, insize, outsize;
1505    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1506    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1507    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1508    const time_t now = time( NULL );
1509
1510    assert( torrentIsLocked( t ) );
1511
1512    for( i=outsize=0; i<insize; ++i )
1513    {
1514        tr_peer * peer = peers[i];
1515        int isWeak;
1516        const int peerIsSeed = peer->progress >= 1.0;
1517        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1518        const double throughput = (2*tr_peerIoGetRateToPeer( peer->io ))
1519                                + tr_peerIoGetRateToClient( peer->io );
1520
1521        assert( atom != NULL );
1522
1523        /* if we're both seeds, give a little bit of time for
1524         * a mutual pex -- peer-msgs initiates a pex exchange
1525         * on startup -- and then disconnect */
1526        if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) )
1527            isWeak = TRUE;
1528        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1529            isWeak = FALSE;
1530        else if( throughput >= 5 )
1531            isWeak = FALSE;
1532        else
1533            isWeak = TRUE;
1534
1535        if( isWeak )
1536        {
1537            ret[outsize].peer = peer;
1538            ret[outsize].throughput = throughput;
1539            ++outsize;
1540        }
1541    }
1542
1543    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1544    *setmeSize = outsize;
1545    return ret;
1546}
1547
1548static int
1549compareAtomByTime( const void * va, const void * vb )
1550{
1551    const struct peer_atom * a = * (const struct peer_atom**) va;
1552    const struct peer_atom * b = * (const struct peer_atom**) vb;
1553    if( a->time < b->time ) return -1;
1554    if( a->time > b->time ) return 1;
1555    return 0;
1556}
1557
1558static struct peer_atom **
1559getPeerCandidates( Torrent * t, int * setmeSize )
1560{
1561    int i, insize, outsize;
1562    struct peer_atom ** atoms;
1563    struct peer_atom ** ret;
1564    const time_t now = time( NULL );
1565    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1566
1567    assert( torrentIsLocked( t ) );
1568
1569    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1570    ret = tr_new( struct peer_atom*, insize );
1571    for( i=outsize=0; i<insize; ++i )
1572    {
1573        struct peer_atom * atom = atoms[i];
1574
1575        /* we don't need two connections to the same peer... */
1576        if( peerIsInUse( t, &atom->addr ) ) {
1577            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1578            continue;
1579        }
1580
1581        /* no need to connect if we're both seeds... */
1582        if( seed && ( atom->flags & 2 ) ) {
1583            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1584            continue;
1585        }
1586
1587        /* if we used this peer recently, give someone else a turn */
1588        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1589            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1590            continue;
1591        }
1592
1593        ret[outsize++] = atom;
1594    }
1595
1596    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1597    *setmeSize = outsize;
1598    return ret;
1599}
1600
1601static int
1602reconnectPulse( void * vtorrent )
1603{
1604    Torrent * t = vtorrent;
1605    struct peer_atom ** candidates;
1606    struct tr_connection * connections;
1607    int i, nCandidates, nConnections, nCull, nAdd;
1608    int peerCount;
1609
1610    torrentLock( t );
1611
1612    connections = getWeakConnections( t, &nConnections );
1613    candidates = getPeerCandidates( t, &nCandidates );
1614
1615    /* figure out how many peers to disconnect */
1616    nCull = nConnections-4; 
1617
1618fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1619
1620for( i=0; i<nConnections; ++i )
1621fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1622
1623    /* disconnect some peers */
1624    for( i=0; i<nCull && i<nConnections; ++i ) {
1625        const double throughput = connections[i].throughput;
1626        tr_peer * peer = connections[i].peer;
1627        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1628        removePeer( t, peer );
1629    }
1630
1631    /* add some new ones */
1632    peerCount = tr_ptrArraySize( t->peers );
1633    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1634    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1635        struct peer_atom * atom = candidates[i];
1636        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1637fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1638        initiateHandshake( t->manager, io );
1639        atom->time = time( NULL );
1640    }
1641
1642    /* cleanup */
1643    tr_free( connections );
1644    tr_free( candidates );
1645    torrentUnlock( t );
1646    return TRUE;
1647}
Note: See TracBrowser for help on using the repository browser.