source: trunk/libtransmission/peer-mgr.c @ 3245

Last change on this file since 3245 was 3245, checked in by charles, 15 years ago

more experimenting with how to get good peers

  • Property svn:keywords set to Date Rev Author Id
File size: 43.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3245 2007-10-01 03:24:52Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "trevent.h"
33#include "utils.h"
34
35#include "pthread.h"
36
37enum
38{
39    /* how frequently to change which peers are choked */
40    RECHOKE_PERIOD_MSEC = (15 * 1000),
41
42    /* how frequently to decide which peers live and die */
43    RECONNECT_PERIOD_MSEC = (15 * 1000),
44
45    /* how frequently to refill peers' request lists */
46    REFILL_PERIOD_MSEC = 1000,
47
48    /* how many peers to unchoke per-torrent. */
49    /* FIXME: make this user-configurable? */
50    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
56    SOON_MSEC = 1000,
57
58    /* following the BT spec, we consider ourselves `snubbed' if
59     * we're we don't get piece data from a peer in this long */
60    SNUBBED_SEC = 60,
61
62    /* this is arbitrary and, hopefully, temporary until we come up
63     * with a better idea for managing the connection limits */
64    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
65};
66
67/**
68***
69**/
70
71/* We keep one of these for every peer we know about, whether
72 * it's connected or not, so the struct must be small.
73 * When our current connections underperform, we dip back
74 * int this list for new ones. */
75struct peer_atom
76{   
77    uint8_t from;
78    uint8_t flags; /* these match the added_f flags */
79    uint16_t port;
80    struct in_addr addr; 
81    time_t time;
82};
83
84typedef struct
85{
86    uint8_t hash[SHA_DIGEST_LENGTH];
87    tr_ptrArray * pool; /* struct peer_atom */
88    tr_ptrArray * peers; /* tr_peer */
89    tr_timer * reconnectTimer;
90    tr_timer * reconnectSoonTimer;
91    tr_timer * rechokeTimer;
92    tr_timer * rechokeSoonTimer;
93    tr_timer * refillTimer;
94    tr_torrent * tor;
95    tr_bitfield * requested;
96
97    unsigned int isRunning : 1;
98
99    struct tr_peerMgr * manager;
100}
101Torrent;
102
103struct tr_peerMgr
104{
105    tr_handle * handle;
106    tr_ptrArray * torrents; /* Torrent */
107    int connectionCount;
108    tr_ptrArray * handshakes; /* in-process */
109    tr_lock * lock;
110    pthread_t lockThread;
111};
112
113/**
114***
115**/
116
117static void
118managerLock( struct tr_peerMgr * manager )
119{
120    assert( manager->lockThread != pthread_self() );
121    tr_lockLock( manager->lock );
122    manager->lockThread = pthread_self();
123}
124static void
125managerUnlock( struct tr_peerMgr * manager )
126{
127    assert( manager->lockThread == pthread_self() );
128    manager->lockThread = 0;
129    tr_lockUnlock( manager->lock );
130}
131static void
132torrentLock( Torrent * torrent )
133{
134    managerLock( torrent->manager );
135}
136static void
137torrentUnlock( Torrent * torrent )
138{
139    managerUnlock( torrent->manager );
140}
141static int
142torrentIsLocked( const Torrent * t )
143{
144    return ( t != NULL )
145        && ( t->manager != NULL )
146        && ( t->manager->lockThread != 0 )
147        && ( t->manager->lockThread == pthread_self( ) );
148}
149
150/**
151***
152**/
153
154static int
155compareAddresses( const struct in_addr * a, const struct in_addr * b )
156{
157    return tr_compareUint32( a->s_addr, b->s_addr );
158}
159
160static int
161handshakeCompareToAddr( const void * va, const void * vb )
162{
163    const tr_handshake * a = va;
164    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
165}
166
167static int
168handshakeCompare( const void * a, const void * b )
169{
170    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
171}
172
173static tr_handshake*
174getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
175{
176    return tr_ptrArrayFindSorted( mgr->handshakes,
177                                  in_addr,
178                                  handshakeCompareToAddr );
179}
180
181static int
182comparePeerAtomToAddress( const void * va, const void * vb )
183{
184    const struct peer_atom * a = va;
185    return compareAddresses( &a->addr, vb );
186}
187
188static int
189comparePeerAtoms( const void * va, const void * vb )
190{
191    const struct peer_atom * b = vb;
192    return comparePeerAtomToAddress( va, &b->addr );
193}
194
195/**
196***
197**/
198
199static int
200torrentCompare( const void * va, const void * vb )
201{
202    const Torrent * a = va;
203    const Torrent * b = vb;
204    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
205}
206
207static int
208torrentCompareToHash( const void * va, const void * vb )
209{
210    const Torrent * a = (const Torrent*) va;
211    const uint8_t * b_hash = (const uint8_t*) vb;
212    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
213}
214
215static Torrent*
216getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
217{
218    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
219                                             hash,
220                                             torrentCompareToHash );
221}
222
223static int
224peerCompare( const void * va, const void * vb )
225{
226    const tr_peer * a = (const tr_peer *) va;
227    const tr_peer * b = (const tr_peer *) vb;
228    return compareAddresses( &a->in_addr, &b->in_addr );
229}
230
231static int
232peerCompareToAddr( const void * va, const void * vb )
233{
234    const tr_peer * a = (const tr_peer *) va;
235    return compareAddresses( &a->in_addr, vb );
236}
237
238static tr_peer*
239getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
240{
241    assert( torrentIsLocked( torrent ) );
242    assert( in_addr != NULL );
243
244    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
245                                             in_addr,
246                                             peerCompareToAddr );
247}
248
249static struct peer_atom*
250getExistingAtom( const Torrent * t, const struct in_addr * addr )
251{
252    assert( torrentIsLocked( t ) );
253    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
254}
255
256static int
257peerIsKnown( const Torrent * t, const struct in_addr * addr )
258{
259    return getExistingAtom( t, addr ) != NULL;
260}
261
262static int
263peerIsInUse( const Torrent * t, const struct in_addr * addr )
264{
265    assert( torrentIsLocked ( t ) );
266
267    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
268        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
269}
270
271static tr_peer*
272getPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    tr_peer * peer;
275
276    assert( torrentIsLocked( torrent ) );
277
278    peer = getExistingPeer( torrent, in_addr );
279
280    if( peer == NULL )
281    {
282        peer = tr_new0( tr_peer, 1 );
283        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
284        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
285    }
286
287    return peer;
288}
289
290static void
291disconnectPeer( tr_peer * peer )
292{
293    assert( peer != NULL );
294
295    tr_peerIoFree( peer->io );
296    peer->io = NULL;
297
298    if( peer->msgs != NULL )
299    {
300        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
301        tr_peerMsgsFree( peer->msgs );
302        peer->msgs = NULL;
303    }
304
305    tr_bitfieldFree( peer->have );
306    peer->have = NULL;
307
308    tr_bitfieldFree( peer->blame );
309    peer->blame = NULL;
310
311    tr_bitfieldFree( peer->banned );
312    peer->banned = NULL;
313}
314
315static void
316freePeer( tr_peer * peer )
317{
318    disconnectPeer( peer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    freePeer( removed );
338}
339
340static void
341freeTorrent( tr_peerMgr * manager, Torrent * t )
342{
343    uint8_t hash[SHA_DIGEST_LENGTH];
344
345    assert( t != NULL );
346    assert( t->peers != NULL );
347    assert( torrentIsLocked( t ) );
348    assert( getExistingTorrent( manager, t->hash ) != NULL );
349
350    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
351
352    tr_timerFree( &t->reconnectTimer );
353    tr_timerFree( &t->reconnectSoonTimer );
354    tr_timerFree( &t->rechokeTimer );
355    tr_timerFree( &t->rechokeSoonTimer );
356    tr_timerFree( &t->refillTimer );
357
358    tr_bitfieldFree( t->requested );
359    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
360    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
361    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
362    tr_free( t );
363
364    assert( getExistingTorrent( manager, hash ) == NULL );
365}
366
367/**
368***
369**/
370
371struct tr_bitfield *
372tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
373                              const uint32_t         pieceCount,
374                              const uint8_t          infohash[20],
375                              const struct in_addr * ip )
376{
377    /* This has been checked against the spec example implementation. Feeding it with :
378    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
379generate :
380    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
381    but since we're storing in a bitfield, it won't be in this order... */
382    /* TODO : We should translate link-local IPv4 adresses to external IP,
383     * so that being on same local network gives us the same allowed pieces */
384   
385    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
386    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
387            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
388            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
389            inet_ntoa( *ip ) );
390   
391    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
392    char buf[4];
393    uint32_t allowedPieceCount = 0;
394    tr_bitfield_t * ret;
395   
396    ret = tr_bitfieldNew( pieceCount );
397   
398    /* We need a seed based on most significant bytes of peer address
399        concatenated with torrent's infohash */
400    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
401   
402    memcpy( seed, &buf, 4 );
403    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
404   
405    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
406   
407    while ( allowedPieceCount < setCount )
408    {
409        int i;
410        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
411        {
412            /* We generate indices from 4-byte chunks of the seed */
413            uint32_t j = i * 4;
414            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
415            uint32_t index = y % pieceCount;
416           
417            if ( !tr_bitfieldHas( ret, index ) )
418            {
419                tr_bitfieldAdd( ret, index );
420                allowedPieceCount++;
421            }
422        }
423        /* We randomize the seed, in case we need to iterate more */
424        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
425    }
426    tr_free( seed );
427   
428    return ret;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->handshakes = tr_ptrArrayNew( );
438    m->lock = tr_lockNew( );
439    return m;
440}
441
442void
443tr_peerMgrFree( tr_peerMgr * manager )
444{
445    managerLock( manager );
446
447    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
448    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
449
450    managerUnlock( manager );
451    tr_lockFree( manager->lock );
452    tr_free( manager );
453}
454
455static tr_peer**
456getConnectedPeers( Torrent * t, int * setmeCount )
457{
458    int i, peerCount, connectionCount;
459    tr_peer **peers;
460    tr_peer **ret;
461
462    assert( torrentIsLocked( t ) );
463
464    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
465    ret = tr_new( tr_peer*, peerCount );
466
467    for( i=connectionCount=0; i<peerCount; ++i )
468        if( peers[i]->msgs != NULL )
469            ret[connectionCount++] = peers[i];
470
471    *setmeCount = connectionCount;
472    return ret;
473}
474
475/***
476****  Refill
477***/
478
479struct tr_refill_piece
480{
481    tr_priority_t priority;
482    uint32_t piece;
483    uint32_t peerCount;
484    uint32_t fastAllowed;
485};
486
487static int
488compareRefillPiece (const void * aIn, const void * bIn)
489{
490    const struct tr_refill_piece * a = aIn;
491    const struct tr_refill_piece * b = bIn;
492
493    /* if one piece has a higher priority, it goes first */
494    if (a->priority != b->priority)
495        return a->priority > b->priority ? -1 : 1;
496
497    /* otherwise if one has fewer peers, it goes first */
498    if (a->peerCount != b->peerCount)
499        return a->peerCount < b->peerCount ? -1 : 1;
500   
501    /* otherwise if one *might be* fastallowed to us */
502    if (a->fastAllowed != b->fastAllowed)
503        return a->fastAllowed < b->fastAllowed ? -1 : 1;
504
505    /* otherwise go with the earlier piece */
506    return a->piece - b->piece;
507}
508
509static int
510isPieceInteresting( const tr_torrent  * tor,
511                    int                 piece )
512{
513    if( tor->info.pieces[piece].dnd ) /* we don't want it */
514        return 0;
515
516    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
517        return 0;
518
519    return 1;
520}
521
522static uint32_t*
523getPreferredPieces( Torrent     * t,
524                    uint32_t    * pieceCount )
525{
526    const tr_torrent * tor = t->tor;
527    const tr_info * inf = &tor->info;
528    int i;
529    uint32_t poolSize = 0;
530    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
531    int peerCount;
532    tr_peer** peers;
533
534    assert( torrentIsLocked( t ) );
535
536    peers = getConnectedPeers( t, &peerCount );
537
538    for( i=0; i<inf->pieceCount; ++i )
539        if( isPieceInteresting( tor, i ) )
540            pool[poolSize++] = i;
541
542    /* sort the pool from most interesting to least... */
543    if( poolSize > 1 )
544    {
545        uint32_t j;
546        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
547
548        for( j=0; j<poolSize; ++j )
549        {
550            int k;
551            const int piece = pool[j];
552            struct tr_refill_piece * setme = p + j;
553
554            setme->piece = piece;
555            setme->priority = inf->pieces[piece].priority;
556            setme->peerCount = 0;
557            setme->fastAllowed = 0;
558            /* FIXME */
559//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
560
561            for( k=0; k<peerCount; ++k ) {
562                const tr_peer * peer = peers[k];
563                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
564                    ++setme->peerCount;
565            }
566        }
567
568        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
569
570        for( j=0; j<poolSize; ++j )
571            pool[j] = p[j].piece;
572
573        tr_free( p );
574    }
575
576#if 0
577fprintf (stderr, "new pool: ");
578for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
579fprintf (stderr, "\n");
580#endif
581    tr_free( peers );
582
583    *pieceCount = poolSize;
584    return pool;
585}
586
587static uint64_t*
588getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
589{
590    uint32_t i;
591    uint32_t pieceCount;
592    uint32_t * pieces;
593    uint64_t *req, *unreq, *ret, *walk;
594    int reqCount, unreqCount;
595    const tr_torrent * tor = t->tor;
596
597    assert( torrentIsLocked( t ) );
598
599    pieces = getPreferredPieces( t, &pieceCount );
600/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
601
602    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
603    reqCount = 0;
604    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
605    unreqCount = 0;
606
607    for( i=0; i<pieceCount; ++i ) {
608        const uint32_t index = pieces[i];
609        const int begin = tr_torPieceFirstBlock( tor, index );
610        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
611        int block;
612        for( block=begin; block<end; ++block )
613            if( tr_cpBlockIsComplete( tor->completion, block ) )
614                continue;
615            else if( tr_bitfieldHas( t->requested, block ) )
616                req[reqCount++] = block;
617            else
618                unreq[unreqCount++] = block;
619    }
620
621/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
622    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
623    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
624    walk += unreqCount;
625    memcpy( walk, req, sizeof(uint64_t) * reqCount );
626    walk += reqCount;
627    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
628    *setmeCount = walk - ret;
629
630    tr_free( req );
631    tr_free( unreq );
632    tr_free( pieces );
633
634    return ret;
635}
636
637static int
638refillPulse( void * vtorrent )
639{
640    Torrent * t = vtorrent;
641    tr_torrent * tor = t->tor;
642    uint32_t i;
643    int peerCount;
644    tr_peer ** peers;
645    uint64_t blockCount;
646    uint64_t * blocks;
647
648    if( !t->isRunning )
649        return TRUE;
650    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
651        return TRUE;
652
653    torrentLock( t );
654
655    blocks = getPreferredBlocks( t, &blockCount );
656    peers = getConnectedPeers( t, &peerCount );
657
658/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
659
660    for( i=0; peerCount && i<blockCount; ++i )
661    {
662        const int block = blocks[i];
663        const uint32_t index = tr_torBlockPiece( tor, block );
664        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
665        const uint32_t length = tr_torBlockCountBytes( tor, block );
666        int j;
667        assert( _tr_block( tor, index, begin ) == block );
668        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
669        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
670
671
672        /* find a peer who can ask for this block */
673        for( j=0; j<peerCount; )
674        {
675            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
676            switch( val )
677            {
678                case TR_ADDREQ_FULL: 
679                case TR_ADDREQ_CLIENT_CHOKED:
680                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
681                    break;
682
683                case TR_ADDREQ_MISSING: 
684                    ++j;
685                    break;
686
687                case TR_ADDREQ_OK:
688                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
689                    tr_bitfieldAdd( t->requested, block );
690                    j = peerCount;
691                    break;
692
693                default:
694                    assert( 0 && "unhandled value" );
695                    break;
696            }
697        }
698    }
699
700    /* cleanup */
701    tr_free( peers );
702    tr_free( blocks );
703
704    t->refillTimer = NULL;
705
706    torrentUnlock( t );
707    return FALSE;
708}
709
710static void
711broadcastClientHave( Torrent * t, uint32_t index )
712{
713    int i, size;
714    tr_peer ** peers;
715
716    assert( torrentIsLocked( t ) );
717
718    peers = getConnectedPeers( t, &size );
719    for( i=0; i<size; ++i )
720        tr_peerMsgsHave( peers[i]->msgs, index );
721    tr_free( peers );
722}
723
724static void
725broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
726{
727    int i, size;
728    tr_peer ** peers;
729
730    assert( torrentIsLocked( t ) );
731
732    peers = getConnectedPeers( t, &size );
733    for( i=0; i<size; ++i )
734        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
735    tr_free( peers );
736}
737
738/**
739***
740**/
741
742static int reconnectPulse( void * vtorrent );
743
744static void
745restartReconnectTimer( Torrent * t )
746{
747    tr_timerFree( &t->reconnectTimer );
748    if( t->isRunning )
749        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
750}
751
752static void
753reconnectNow( Torrent * t )
754{
755    reconnectPulse( t );
756    restartReconnectTimer( t );
757}
758
759static int
760reconnectSoonCB( void * vt )
761{
762    Torrent * t = vt;
763    reconnectNow( t );
764    t->reconnectSoonTimer = NULL;
765    return FALSE;
766}
767
768static void
769reconnectSoon( Torrent * t )
770{
771    if( t->reconnectSoonTimer == NULL )
772        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
773                                             reconnectSoonCB, t, SOON_MSEC );
774}
775
776/**
777***
778**/
779
780static int rechokePulse( void * vtorrent );
781
782static void
783restartChokeTimer( Torrent * t )
784{
785    tr_timerFree( &t->rechokeTimer );
786    if( t->isRunning )
787        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
788}
789
790static void
791rechokeNow( Torrent * t )
792{
793    rechokePulse( t );
794    restartChokeTimer( t );
795}
796
797static int
798rechokeSoonCB( void * vt )
799{
800    Torrent * t = vt;
801    rechokeNow( t );
802    t->rechokeSoonTimer = NULL;
803    return FALSE;
804}
805
806static void
807rechokeSoon( Torrent * t )
808{
809    if( t->rechokeSoonTimer == NULL )
810        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
811                                           rechokeSoonCB, t, SOON_MSEC );
812}
813
814static void
815msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
816{
817    tr_peer * peer = vpeer;
818    Torrent * t = (Torrent *) vt;
819    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
820    const int needLock = !torrentIsLocked( t );
821
822    if( needLock )
823        torrentLock( t );
824
825    switch( e->eventType )
826    {
827        case TR_PEERMSG_NEED_REQ:
828            if( t->refillTimer == NULL )
829                t->refillTimer = tr_timerNew( t->manager->handle,
830                                              refillPulse, t,
831                                              REFILL_PERIOD_MSEC );
832            break;
833
834        case TR_PEERMSG_CLIENT_HAVE:
835            broadcastClientHave( t, e->pieceIndex );
836            tr_torrentRecheckCompleteness( t->tor );
837            break;
838
839        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
840#if 0
841            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
842            const int peerIsSeed = e->progress >= 1.0;
843            if( clientIsSeed && peerIsSeed )
844                peer->doPurge = 1;
845#endif
846            break;
847        }
848
849        case TR_PEERMSG_CLIENT_BLOCK:
850            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
851            break;
852
853        case TR_PEERMSG_GOT_ERROR:
854            peer->doPurge = 1;
855            reconnectSoon( t );
856            break;
857
858        default:
859            assert(0);
860    }
861
862    if( needLock )
863        torrentUnlock( t );
864}
865
866static void
867myHandshakeDoneCB( tr_handshake    * handshake,
868                   tr_peerIo       * io,
869                   int               isConnected,
870                   const uint8_t   * peer_id,
871                   void            * vmanager )
872{
873    int ok = isConnected;
874    uint16_t port;
875    const struct in_addr * in_addr;
876    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
877    const uint8_t * hash = NULL;
878    Torrent * t;
879    tr_handshake * ours;
880
881    assert( io != NULL );
882    assert( isConnected==0 || isConnected==1 );
883
884    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
885                                    handshake,
886                                    handshakeCompare );
887    assert( ours != NULL );
888    assert( ours == handshake );
889
890    in_addr = tr_peerIoGetAddress( io, &port );
891
892    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
893    {
894        tr_peerIoFree( io );
895        --manager->connectionCount;
896        return;
897    }
898
899    hash = tr_peerIoGetTorrentHash( io );
900    t = getExistingTorrent( manager, hash );
901
902    if( t != NULL )
903        torrentLock( t );
904
905    if( !t || !t->isRunning )
906    {
907        tr_peerIoFree( io );
908        --manager->connectionCount;
909    }
910    else if( !ok )
911    {
912        /* if we couldn't connect or were snubbed,
913         * the peer's probably not worth remembering. */
914        tr_peer * peer = getExistingPeer( t, in_addr );
915        tr_peerIoFree( io );
916        --manager->connectionCount;
917        if( peer )
918            peer->doPurge = 1;
919    }
920    else /* looking good */
921    {
922        tr_peer * peer = getPeer( t, in_addr );
923        if( peer->msgs != NULL ) { /* we already have this peer */
924            tr_peerIoFree( io );
925            --manager->connectionCount;
926        } else {
927            peer->port = port;
928            peer->io = io;
929            peer->msgs = tr_peerMsgsNew( t->tor, peer );
930            tr_free( peer->client );
931            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
932            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
933            rechokeSoon( t );
934        }
935    }
936
937    if( t != NULL )
938        torrentUnlock( t );
939}
940
941static void
942initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
943{
944    tr_handshake * handshake;
945
946    assert( manager->lockThread!=0 );
947    assert( io != NULL );
948
949    handshake = tr_handshakeNew( io,
950                                 manager->handle->encryptionMode,
951                                 myHandshakeDoneCB,
952                                 manager );
953    ++manager->connectionCount;
954
955    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
956}
957
958void
959tr_peerMgrAddIncoming( tr_peerMgr      * manager,
960                       struct in_addr  * addr,
961                       uint16_t          port,
962                       int               socket )
963{
964    managerLock( manager );
965
966    if( getExistingHandshake( manager, addr ) == NULL )
967    {
968        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
969        initiateHandshake( manager, io );
970    }
971
972    managerUnlock( manager );
973}
974
975static void
976maybeAddNewAtom( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
977{
978    if( !peerIsKnown( t, addr ) )
979    {
980        struct peer_atom * a = tr_new( struct peer_atom, 1 );
981        a->addr = *addr;
982        a->port = port;
983        a->flags = flags;
984        a->from = from;
985        a->time = 0;
986fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
987        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
988    }
989}
990
991void
992tr_peerMgrAddPex( tr_peerMgr     * manager,
993                  const uint8_t  * torrentHash,
994                  int              from,
995                  const tr_pex   * pex,
996                  int              pexCount )
997{
998    Torrent * t;
999    const tr_pex * end;
1000
1001    managerLock( manager );
1002
1003    t = getExistingTorrent( manager, torrentHash );
1004    for( end=pex+pexCount; pex!=end; ++pex )
1005        maybeAddNewAtom( t, &pex->in_addr, pex->port, pex->flags, from );
1006    reconnectSoon( t );
1007
1008    managerUnlock( manager );
1009}
1010
1011void
1012tr_peerMgrAddPeers( tr_peerMgr    * manager,
1013                    const uint8_t * torrentHash,
1014                    int             from,
1015                    const uint8_t * peerCompact,
1016                    int             peerCount )
1017{
1018    int i;
1019    const uint8_t * walk = peerCompact;
1020    Torrent * t;
1021
1022    managerLock( manager );
1023
1024    t = getExistingTorrent( manager, torrentHash );
1025    for( i=0; t!=NULL && i<peerCount; ++i )
1026    {
1027        struct in_addr addr;
1028        uint16_t port;
1029        memcpy( &addr, walk, 4 ); walk += 4;
1030        memcpy( &port, walk, 2 ); walk += 2;
1031        maybeAddNewAtom( t, &addr, port, 0, from );
1032    }
1033    reconnectSoon( t );
1034
1035    managerUnlock( manager );
1036}
1037
1038/**
1039***
1040**/
1041
1042int
1043tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1044{
1045    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1046}
1047
1048void
1049tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1050                    const uint8_t  * torrentHash UNUSED,
1051                    int              pieceIndex UNUSED,
1052                    int              success UNUSED )
1053{
1054    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1055}
1056
1057int
1058tr_pexCompare( const void * va, const void * vb )
1059{
1060    const tr_pex * a = (const tr_pex *) va;
1061    const tr_pex * b = (const tr_pex *) vb;
1062    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1063    if( i ) return i;
1064    if( a->port < b->port ) return -1;
1065    if( a->port > b->port ) return 1;
1066    return 0;
1067}
1068
1069int tr_pexCompare( const void * a, const void * b );
1070
1071static int
1072peerPrefersCrypto( const tr_peer * peer )
1073{
1074    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1075        return TRUE;
1076
1077    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1078        return FALSE;
1079
1080    return tr_peerIoIsEncrypted( peer->io );
1081};
1082
1083int
1084tr_peerMgrGetPeers( tr_peerMgr      * manager,
1085                    const uint8_t   * torrentHash,
1086                    tr_pex         ** setme_pex )
1087{
1088    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1089    int i, peerCount;
1090    const int isLocked = torrentIsLocked( t );
1091    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1092    tr_pex * pex = tr_new( tr_pex, peerCount );
1093    tr_pex * walk = pex;
1094
1095    if( !isLocked )
1096        torrentLock( (Torrent*)t );
1097
1098    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1099    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1100    pex = walk = tr_new( tr_pex, peerCount );
1101
1102    for( i=0; i<peerCount; ++i, ++walk )
1103    {
1104        const tr_peer * peer = peers[i];
1105
1106        walk->in_addr = peer->in_addr;
1107
1108        walk->port = peer->port;
1109
1110        walk->flags = 0;
1111        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1112        if( peer->progress >= 1.0 )    walk->flags |= 2;
1113    }
1114
1115    assert( ( walk - pex ) == peerCount );
1116    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1117    *setme_pex = pex;
1118
1119    if( !isLocked )
1120        torrentUnlock( (Torrent*)t );
1121
1122    return peerCount;
1123}
1124
1125void
1126tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1127                        const uint8_t  * torrentHash )
1128{
1129    Torrent * t;
1130
1131    managerLock( manager );
1132
1133    t = getExistingTorrent( manager, torrentHash );
1134    t->isRunning = 1;
1135    restartChokeTimer( t );
1136    reconnectSoon( t );
1137
1138    managerUnlock( manager );
1139}
1140
1141static void
1142stopTorrent( Torrent * t )
1143{
1144    int i, size;
1145    tr_peer ** peers;
1146
1147    assert( torrentIsLocked( t ) );
1148
1149    t->isRunning = 0;
1150    tr_timerFree( &t->rechokeTimer );
1151    tr_timerFree( &t->reconnectTimer );
1152
1153    peers = getConnectedPeers( t, &size );
1154    for( i=0; i<size; ++i )
1155        disconnectPeer( peers[i] );
1156
1157    tr_free( peers );
1158}
1159void
1160tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1161                       const uint8_t  * torrentHash)
1162{
1163    managerLock( manager );
1164
1165    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1166
1167    managerUnlock( manager );
1168}
1169
1170void
1171tr_peerMgrAddTorrent( tr_peerMgr * manager,
1172                      tr_torrent * tor )
1173{
1174    Torrent * t;
1175
1176    managerLock( manager );
1177
1178    assert( tor != NULL );
1179    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1180
1181    t = tr_new0( Torrent, 1 );
1182    t->manager = manager;
1183    t->tor = tor;
1184    t->pool = tr_ptrArrayNew( );
1185    t->peers = tr_ptrArrayNew( );
1186    t->requested = tr_bitfieldNew( tor->blockCount );
1187    restartChokeTimer( t );
1188    restartReconnectTimer( t );
1189
1190    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1191    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1192
1193    managerUnlock( manager );
1194}
1195
1196void
1197tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1198                         const uint8_t  * torrentHash )
1199{
1200    Torrent * t;
1201
1202    managerLock( manager );
1203
1204    t = getExistingTorrent( manager, torrentHash );
1205    assert( t != NULL );
1206    stopTorrent( t );
1207    freeTorrent( manager, t );
1208
1209    managerUnlock( manager );
1210}
1211
1212void
1213tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1214                               const uint8_t    * torrentHash,
1215                               int8_t           * tab,
1216                               int                tabCount )
1217{
1218    int i;
1219    const Torrent * t;
1220    const tr_torrent * tor;
1221    float interval;
1222
1223    managerLock( (tr_peerMgr*)manager );
1224
1225    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1226    tor = t->tor;
1227    interval = tor->info.pieceCount / (float)tabCount;
1228
1229    memset( tab, 0, tabCount );
1230
1231    for( i=0; i<tabCount; ++i )
1232    {
1233        const int piece = i * interval;
1234
1235        if( tor == NULL )
1236            tab[i] = 0;
1237        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1238            tab[i] = -1;
1239        else {
1240            int j, peerCount;
1241            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1242            for( j=0; j<peerCount; ++j )
1243                if( tr_bitfieldHas( peers[j]->have, i ) )
1244                    ++tab[i];
1245        }
1246    }
1247
1248    managerUnlock( (tr_peerMgr*)manager );
1249}
1250
1251/* Returns the pieces that we and/or a connected peer has */
1252tr_bitfield*
1253tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1254                        const uint8_t    * torrentHash )
1255{
1256    int i, size;
1257    const Torrent * t;
1258    const tr_peer ** peers;
1259    tr_bitfield * pieces;
1260
1261    managerLock( (tr_peerMgr*)manager );
1262
1263    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1264    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1265    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1266    for( i=0; i<size; ++i )
1267        if( peers[i]->io != NULL )
1268            tr_bitfieldAnd( pieces, peers[i]->have );
1269
1270    managerUnlock( (tr_peerMgr*)manager );
1271    return pieces;
1272}
1273
1274void
1275tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1276                        const uint8_t    * torrentHash,
1277                        int              * setmePeersTotal,
1278                        int              * setmePeersConnected,
1279                        int              * setmePeersSendingToUs,
1280                        int              * setmePeersGettingFromUs,
1281                        int              * setmePeersFrom )
1282{
1283    int i, size;
1284    const Torrent * t;
1285    const tr_peer ** peers;
1286
1287    managerLock( (tr_peerMgr*)manager );
1288
1289    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1290    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1291
1292    *setmePeersTotal          = size;
1293    *setmePeersConnected      = 0;
1294    *setmePeersSendingToUs    = 0;
1295    *setmePeersGettingFromUs  = 0;
1296
1297    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1298        setmePeersFrom[i] = 0;
1299
1300    for( i=0; i<size; ++i )
1301    {
1302        const tr_peer * peer = peers[i];
1303
1304        if( peer->io == NULL ) /* not connected */
1305            continue;
1306
1307        ++*setmePeersConnected;
1308
1309        ++setmePeersFrom[peer->from];
1310
1311        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1312            ++*setmePeersGettingFromUs;
1313
1314        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1315            ++*setmePeersSendingToUs;
1316    }
1317
1318    managerUnlock( (tr_peerMgr*)manager );
1319}
1320
1321struct tr_peer_stat *
1322tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1323                     const uint8_t     * torrentHash,
1324                     int               * setmeCount UNUSED )
1325{
1326    int i, size;
1327    const Torrent * t;
1328    const tr_peer ** peers;
1329    tr_peer_stat * ret;
1330
1331    assert( manager != NULL );
1332    managerLock( (tr_peerMgr*)manager );
1333
1334    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1335    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1336
1337    ret = tr_new0( tr_peer_stat, size );
1338
1339    for( i=0; i<size; ++i )
1340    {
1341        const tr_peer * peer = peers[i];
1342        const int live = peer->io != NULL;
1343        tr_peer_stat * stat = ret + i;
1344
1345        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1346        stat->port             = peer->port;
1347        stat->from             = peer->from;
1348        stat->client           = peer->client;
1349        stat->progress         = peer->progress;
1350        stat->isConnected      = live ? 1 : 0;
1351        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1352        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1353        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1354        stat->isDownloading    = stat->uploadToRate > 0.01;
1355        stat->isUploading      = stat->downloadFromRate > 0.01;
1356    }
1357
1358    *setmeCount = size;
1359
1360    managerUnlock( (tr_peerMgr*)manager );
1361    return ret;
1362}
1363
1364/**
1365***
1366**/
1367
1368typedef struct
1369{
1370    tr_peer * peer;
1371    float rate;
1372    int randomKey;
1373    int preferred;
1374    int doUnchoke;
1375}
1376ChokeData;
1377
1378static int
1379compareChoke( const void * va, const void * vb )
1380{
1381    const ChokeData * a = ( const ChokeData * ) va;
1382    const ChokeData * b = ( const ChokeData * ) vb;
1383
1384    if( a->preferred != b->preferred )
1385        return a->preferred ? -1 : 1;
1386
1387    if( a->preferred )
1388    {
1389        if( a->rate > b->rate ) return -1;
1390        if( a->rate < b->rate ) return 1;
1391        return 0;
1392    }
1393    else
1394    {
1395        return a->randomKey - b->randomKey;
1396    }
1397}
1398
1399static int
1400clientIsSnubbedBy( const tr_peer * peer )
1401{
1402    assert( peer != NULL );
1403
1404    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1405}
1406
1407/**
1408***
1409**/
1410
1411static void
1412rechokeLeech( Torrent * t )
1413{
1414    int i, peerCount, size=0, unchoked=0;
1415    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1416    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1417    ChokeData * choke = tr_new0( ChokeData, peerCount );
1418
1419    assert( torrentIsLocked( t ) );
1420   
1421    /* sort the peers by preference and rate */
1422    for( i=0; i<peerCount; ++i )
1423    {
1424        tr_peer * peer = peers[i];
1425        ChokeData * node;
1426        if( peer->chokeChangedAt > ignorePeersNewerThan )
1427            continue;
1428
1429        node = &choke[size++];
1430        node->peer = peer;
1431        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1432        node->randomKey = tr_rand( INT_MAX );
1433        node->rate = tr_peerIoGetRateToClient( peer->io );
1434    }
1435
1436    qsort( choke, size, sizeof(ChokeData), compareChoke );
1437
1438    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1439        choke[i].doUnchoke = 1;
1440        ++unchoked;
1441    }
1442
1443    for( ; i<size; ++i ) {
1444        choke[i].doUnchoke = 1;
1445        ++unchoked;
1446        if( choke[i].peer->peerIsInterested )
1447            break;
1448    }
1449
1450    for( i=0; i<size; ++i )
1451        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1452
1453    /* cleanup */
1454    tr_free( choke );
1455    tr_free( peers );
1456}
1457
1458static void
1459rechokeSeed( Torrent * t )
1460{
1461    int i, size;
1462    tr_peer ** peers;
1463
1464    assert( torrentIsLocked( t ) );
1465
1466    peers = getConnectedPeers( t, &size );
1467
1468    /* FIXME */
1469    for( i=0; i<size; ++i )
1470        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1471
1472    tr_free( peers );
1473}
1474
1475static int
1476rechokePulse( void * vtorrent )
1477{
1478    Torrent * t = vtorrent;
1479    torrentLock( t );
1480
1481    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1482    if( done )
1483        rechokeLeech( vtorrent );
1484    else
1485        rechokeSeed( vtorrent );
1486
1487    torrentUnlock( t );
1488    return TRUE;
1489}
1490
1491/***
1492****
1493****
1494****
1495***/
1496
1497struct tr_connection
1498{
1499    tr_peer * peer;
1500    double throughput;
1501};
1502
1503#define LAISSEZ_FAIRE_PERIOD_SECS 60
1504
1505static int
1506compareConnections( const void * va, const void * vb )
1507{
1508    const struct tr_connection * a = va;
1509    const struct tr_connection * b = vb;
1510    if( a->throughput < b->throughput ) return -1;
1511    if( a->throughput > b->throughput ) return 1;
1512    return 0;
1513}
1514
1515static struct tr_connection *
1516getWeakConnections( Torrent * t, int * setmeSize )
1517{
1518    int i, insize, outsize;
1519    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1520    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1521    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1522    const time_t now = time( NULL );
1523
1524    assert( torrentIsLocked( t ) );
1525
1526    for( i=outsize=0; i<insize; ++i )
1527    {
1528        tr_peer * peer = peers[i];
1529        int isWeak;
1530        const int peerIsSeed = peer->progress >= 1.0;
1531        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1532        const double throughput = (2*tr_peerIoGetRateToPeer( peer->io ))
1533                                + tr_peerIoGetRateToClient( peer->io );
1534
1535        /* if we're both seeds, give a little bit of time for
1536         * a mutual pex -- peer-msgs initiates a pex exchange
1537         * on startup -- and then disconnect */
1538        if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) )
1539            isWeak = TRUE;
1540        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1541            isWeak = FALSE;
1542        else if( throughput >= 5 )
1543            isWeak = FALSE;
1544        else
1545            isWeak = TRUE;
1546
1547        if( isWeak )
1548        {
1549            ret[outsize].peer = peer;
1550            ret[outsize].throughput = throughput;
1551            ++outsize;
1552        }
1553    }
1554
1555    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1556    *setmeSize = outsize;
1557    return ret;
1558}
1559
1560static int
1561compareAtomByTime( const void * va, const void * vb )
1562{
1563    const struct peer_atom * a = * (const struct peer_atom**) va;
1564    const struct peer_atom * b = * (const struct peer_atom**) vb;
1565    if( a->time < b->time ) return -1;
1566    if( a->time > b->time ) return 1;
1567    return 0;
1568}
1569
1570static struct peer_atom **
1571getPeerCandidates( Torrent * t, int * setmeSize )
1572{
1573    int i, insize, outsize;
1574    struct peer_atom ** atoms;
1575    struct peer_atom ** ret;
1576    const time_t now = time( NULL );
1577    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1578
1579    assert( torrentIsLocked( t ) );
1580
1581    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1582    ret = tr_new( struct peer_atom*, insize );
1583    for( i=outsize=0; i<insize; ++i )
1584    {
1585        struct peer_atom * atom = atoms[i];
1586
1587        /* we don't need two connections to the same peer... */
1588        if( peerIsInUse( t, &atom->addr ) ) {
1589            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1590            continue;
1591        }
1592
1593        /* no need to connect if we're both seeds... */
1594        if( seed && ( atom->flags & 2 ) ) {
1595            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1596            continue;
1597        }
1598
1599        /* if we used this peer recently, give someone else a turn */
1600        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1601            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1602            continue;
1603        }
1604
1605        ret[outsize++] = atom;
1606    }
1607
1608    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1609    *setmeSize = outsize;
1610    return ret;
1611}
1612
1613static int
1614reconnectPulse( void * vtorrent )
1615{
1616    Torrent * t = vtorrent;
1617    struct peer_atom ** candidates;
1618    struct tr_connection * connections;
1619    int i, nCandidates, nConnections, nCull, nAdd;
1620    int peerCount;
1621
1622    torrentLock( t );
1623
1624    connections = getWeakConnections( t, &nConnections );
1625    candidates = getPeerCandidates( t, &nCandidates );
1626
1627    /* figure out how many peers to disconnect */
1628    nCull = nConnections-4; 
1629
1630fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1631
1632for( i=0; i<nConnections; ++i )
1633fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1634
1635    /* disconnect some peers */
1636    for( i=0; i<nCull && i<nConnections; ++i ) {
1637        const double throughput = connections[i].throughput;
1638        tr_peer * peer = connections[i].peer;
1639        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1640        removePeer( t, peer );
1641    }
1642
1643    /* add some new ones */
1644    peerCount = tr_ptrArraySize( t->peers );
1645    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1646    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1647        struct peer_atom * atom = candidates[i];
1648        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1649fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1650        initiateHandshake( t->manager, io );
1651        atom->time = time( NULL );
1652    }
1653
1654    /* cleanup */
1655    tr_free( connections );
1656    tr_free( candidates );
1657    torrentUnlock( t );
1658    return TRUE;
1659}
Note: See TracBrowser for help on using the repository browser.