source: trunk/libtransmission/peer-mgr.c @ 3253

Last change on this file since 3253 was 3253, checked in by charles, 15 years ago

follow BT `best practices' more closely w.r.t. choosing which pieces to download

  • Property svn:keywords set to Date Rev Author Id
File size: 43.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3253 2007-10-01 14:24:22Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "trevent.h"
33#include "utils.h"
34
35#include "pthread.h"
36
37enum
38{
39    /* how frequently to change which peers are choked */
40    RECHOKE_PERIOD_MSEC = (15 * 1000),
41
42    /* how frequently to decide which peers live and die */
43    RECONNECT_PERIOD_MSEC = (15 * 1000),
44
45    /* how frequently to refill peers' request lists */
46    REFILL_PERIOD_MSEC = 1000,
47
48    /* how many peers to unchoke per-torrent. */
49    /* FIXME: make this user-configurable? */
50    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
56    SOON_MSEC = 1000,
57
58    /* following the BT spec, we consider ourselves `snubbed' if
59     * we're we don't get piece data from a peer in this long */
60    SNUBBED_SEC = 60,
61
62    /* this is arbitrary and, hopefully, temporary until we come up
63     * with a better idea for managing the connection limits */
64    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
65};
66
67/**
68***
69**/
70
71/* We keep one of these for every peer we know about, whether
72 * it's connected or not, so the struct must be small.
73 * When our current connections underperform, we dip back
74 * int this list for new ones. */
75struct peer_atom
76{   
77    uint8_t from;
78    uint8_t flags; /* these match the added_f flags */
79    uint16_t port;
80    struct in_addr addr; 
81    time_t time;
82};
83
84typedef struct
85{
86    uint8_t hash[SHA_DIGEST_LENGTH];
87    tr_ptrArray * pool; /* struct peer_atom */
88    tr_ptrArray * peers; /* tr_peer */
89    tr_timer * reconnectTimer;
90    tr_timer * reconnectSoonTimer;
91    tr_timer * rechokeTimer;
92    tr_timer * rechokeSoonTimer;
93    tr_timer * refillTimer;
94    tr_torrent * tor;
95    tr_bitfield * requested;
96
97    unsigned int isRunning : 1;
98
99    struct tr_peerMgr * manager;
100}
101Torrent;
102
103struct tr_peerMgr
104{
105    tr_handle * handle;
106    tr_ptrArray * torrents; /* Torrent */
107    int connectionCount;
108    tr_ptrArray * handshakes; /* in-process */
109    tr_lock * lock;
110    pthread_t lockThread;
111};
112
113/**
114***
115**/
116
117static void
118managerLock( struct tr_peerMgr * manager )
119{
120    assert( manager->lockThread != pthread_self() );
121    tr_lockLock( manager->lock );
122    manager->lockThread = pthread_self();
123}
124static void
125managerUnlock( struct tr_peerMgr * manager )
126{
127    assert( manager->lockThread == pthread_self() );
128    manager->lockThread = 0;
129    tr_lockUnlock( manager->lock );
130}
131static void
132torrentLock( Torrent * torrent )
133{
134    managerLock( torrent->manager );
135}
136static void
137torrentUnlock( Torrent * torrent )
138{
139    managerUnlock( torrent->manager );
140}
141static int
142torrentIsLocked( const Torrent * t )
143{
144    return ( t != NULL )
145        && ( t->manager != NULL )
146        && ( t->manager->lockThread != 0 )
147        && ( t->manager->lockThread == pthread_self( ) );
148}
149
150/**
151***
152**/
153
154static int
155compareAddresses( const struct in_addr * a, const struct in_addr * b )
156{
157    return tr_compareUint32( a->s_addr, b->s_addr );
158}
159
160static int
161handshakeCompareToAddr( const void * va, const void * vb )
162{
163    const tr_handshake * a = va;
164    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
165}
166
167static int
168handshakeCompare( const void * a, const void * b )
169{
170    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
171}
172
173static tr_handshake*
174getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
175{
176    return tr_ptrArrayFindSorted( mgr->handshakes,
177                                  in_addr,
178                                  handshakeCompareToAddr );
179}
180
181static int
182comparePeerAtomToAddress( const void * va, const void * vb )
183{
184    const struct peer_atom * a = va;
185    return compareAddresses( &a->addr, vb );
186}
187
188static int
189comparePeerAtoms( const void * va, const void * vb )
190{
191    const struct peer_atom * b = vb;
192    return comparePeerAtomToAddress( va, &b->addr );
193}
194
195/**
196***
197**/
198
199static int
200torrentCompare( const void * va, const void * vb )
201{
202    const Torrent * a = va;
203    const Torrent * b = vb;
204    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
205}
206
207static int
208torrentCompareToHash( const void * va, const void * vb )
209{
210    const Torrent * a = (const Torrent*) va;
211    const uint8_t * b_hash = (const uint8_t*) vb;
212    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
213}
214
215static Torrent*
216getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
217{
218    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
219                                             hash,
220                                             torrentCompareToHash );
221}
222
223static int
224peerCompare( const void * va, const void * vb )
225{
226    const tr_peer * a = (const tr_peer *) va;
227    const tr_peer * b = (const tr_peer *) vb;
228    return compareAddresses( &a->in_addr, &b->in_addr );
229}
230
231static int
232peerCompareToAddr( const void * va, const void * vb )
233{
234    const tr_peer * a = (const tr_peer *) va;
235    return compareAddresses( &a->in_addr, vb );
236}
237
238static tr_peer*
239getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
240{
241    assert( torrentIsLocked( torrent ) );
242    assert( in_addr != NULL );
243
244    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
245                                             in_addr,
246                                             peerCompareToAddr );
247}
248
249static struct peer_atom*
250getExistingAtom( const Torrent * t, const struct in_addr * addr )
251{
252    assert( torrentIsLocked( t ) );
253    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
254}
255
256static int
257peerIsKnown( const Torrent * t, const struct in_addr * addr )
258{
259    return getExistingAtom( t, addr ) != NULL;
260}
261
262static int
263peerIsInUse( const Torrent * t, const struct in_addr * addr )
264{
265    assert( torrentIsLocked ( t ) );
266
267    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
268        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
269}
270
271static tr_peer*
272getPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    tr_peer * peer;
275
276    assert( torrentIsLocked( torrent ) );
277
278    peer = getExistingPeer( torrent, in_addr );
279
280    if( peer == NULL )
281    {
282        peer = tr_new0( tr_peer, 1 );
283        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
284        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
285    }
286
287    return peer;
288}
289
290static void
291disconnectPeer( tr_peer * peer )
292{
293    assert( peer != NULL );
294
295    tr_peerIoFree( peer->io );
296    peer->io = NULL;
297
298    if( peer->msgs != NULL )
299    {
300        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
301        tr_peerMsgsFree( peer->msgs );
302        peer->msgs = NULL;
303    }
304
305    tr_bitfieldFree( peer->have );
306    peer->have = NULL;
307
308    tr_bitfieldFree( peer->blame );
309    peer->blame = NULL;
310
311    tr_bitfieldFree( peer->banned );
312    peer->banned = NULL;
313}
314
315static void
316freePeer( tr_peer * peer )
317{
318    disconnectPeer( peer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    freePeer( removed );
338}
339
340static void
341freeTorrent( tr_peerMgr * manager, Torrent * t )
342{
343    uint8_t hash[SHA_DIGEST_LENGTH];
344
345    assert( t != NULL );
346    assert( t->peers != NULL );
347    assert( torrentIsLocked( t ) );
348    assert( getExistingTorrent( manager, t->hash ) != NULL );
349
350    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
351
352    tr_timerFree( &t->reconnectTimer );
353    tr_timerFree( &t->reconnectSoonTimer );
354    tr_timerFree( &t->rechokeTimer );
355    tr_timerFree( &t->rechokeSoonTimer );
356    tr_timerFree( &t->refillTimer );
357
358    tr_bitfieldFree( t->requested );
359    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
360    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
361    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
362    tr_free( t );
363
364    assert( getExistingTorrent( manager, hash ) == NULL );
365}
366
367/**
368***
369**/
370
371struct tr_bitfield *
372tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
373                              const uint32_t         pieceCount,
374                              const uint8_t          infohash[20],
375                              const struct in_addr * ip )
376{
377    /* This has been checked against the spec example implementation. Feeding it with :
378    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
379generate :
380    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
381    but since we're storing in a bitfield, it won't be in this order... */
382    /* TODO : We should translate link-local IPv4 adresses to external IP,
383     * so that being on same local network gives us the same allowed pieces */
384   
385    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
386    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
387            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
388            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
389            inet_ntoa( *ip ) );
390   
391    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
392    char buf[4];
393    uint32_t allowedPieceCount = 0;
394    tr_bitfield_t * ret;
395   
396    ret = tr_bitfieldNew( pieceCount );
397   
398    /* We need a seed based on most significant bytes of peer address
399        concatenated with torrent's infohash */
400    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
401   
402    memcpy( seed, &buf, 4 );
403    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
404   
405    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
406   
407    while ( allowedPieceCount < setCount )
408    {
409        int i;
410        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
411        {
412            /* We generate indices from 4-byte chunks of the seed */
413            uint32_t j = i * 4;
414            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
415            uint32_t index = y % pieceCount;
416           
417            if ( !tr_bitfieldHas( ret, index ) )
418            {
419                tr_bitfieldAdd( ret, index );
420                allowedPieceCount++;
421            }
422        }
423        /* We randomize the seed, in case we need to iterate more */
424        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
425    }
426    tr_free( seed );
427   
428    return ret;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->handshakes = tr_ptrArrayNew( );
438    m->lock = tr_lockNew( );
439    return m;
440}
441
442void
443tr_peerMgrFree( tr_peerMgr * manager )
444{
445    managerLock( manager );
446
447    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
448    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
449
450    managerUnlock( manager );
451    tr_lockFree( manager->lock );
452    tr_free( manager );
453}
454
455static tr_peer**
456getConnectedPeers( Torrent * t, int * setmeCount )
457{
458    int i, peerCount, connectionCount;
459    tr_peer **peers;
460    tr_peer **ret;
461
462    assert( torrentIsLocked( t ) );
463
464    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
465    ret = tr_new( tr_peer*, peerCount );
466
467    for( i=connectionCount=0; i<peerCount; ++i )
468        if( peers[i]->msgs != NULL )
469            ret[connectionCount++] = peers[i];
470
471    *setmeCount = connectionCount;
472    return ret;
473}
474
475/***
476****  Refill
477***/
478
479struct tr_refill_piece
480{
481    tr_priority_t priority;
482    uint32_t piece;
483    uint32_t peerCount;
484    uint32_t fastAllowed;
485    uint32_t random;
486};
487
488static int
489compareRefillPiece (const void * aIn, const void * bIn)
490{
491    const struct tr_refill_piece * a = aIn;
492    const struct tr_refill_piece * b = bIn;
493
494    /* if one piece has a higher priority, it goes first */
495    if (a->priority != b->priority)
496        return a->priority > b->priority ? -1 : 1;
497
498    /* otherwise if one has fewer peers, it goes first */
499    if (a->peerCount != b->peerCount)
500        return a->peerCount < b->peerCount ? -1 : 1;
501   
502    /* otherwise if one *might be* fastallowed to us */
503    if (a->fastAllowed != b->fastAllowed)
504        return a->fastAllowed < b->fastAllowed ? -1 : 1;
505
506    /* otherwise go with our random seed */
507    return tr_compareUint32( a->random, b->random );
508}
509
510static int
511isPieceInteresting( const tr_torrent  * tor,
512                    int                 piece )
513{
514    if( tor->info.pieces[piece].dnd ) /* we don't want it */
515        return 0;
516
517    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
518        return 0;
519
520    return 1;
521}
522
523static uint32_t*
524getPreferredPieces( Torrent     * t,
525                    uint32_t    * pieceCount )
526{
527    const tr_torrent * tor = t->tor;
528    const tr_info * inf = &tor->info;
529    int i;
530    uint32_t poolSize = 0;
531    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
532    int peerCount;
533    tr_peer** peers;
534
535    assert( torrentIsLocked( t ) );
536
537    peers = getConnectedPeers( t, &peerCount );
538
539    for( i=0; i<inf->pieceCount; ++i )
540        if( isPieceInteresting( tor, i ) )
541            pool[poolSize++] = i;
542
543    /* sort the pool from most interesting to least... */
544    if( poolSize > 1 )
545    {
546        uint32_t j;
547        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
548
549        for( j=0; j<poolSize; ++j )
550        {
551            int k;
552            const int piece = pool[j];
553            struct tr_refill_piece * setme = p + j;
554
555            setme->piece = piece;
556            setme->priority = inf->pieces[piece].priority;
557            setme->peerCount = 0;
558            setme->fastAllowed = 0;
559            setme->random = tr_rand( UINT32_MAX );
560            /* FIXME */
561//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
562
563            for( k=0; k<peerCount; ++k ) {
564                const tr_peer * peer = peers[k];
565                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
566                    ++setme->peerCount;
567            }
568        }
569
570        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
571
572        for( j=0; j<poolSize; ++j )
573            pool[j] = p[j].piece;
574
575        tr_free( p );
576    }
577
578#if 0
579fprintf (stderr, "new pool: ");
580for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
581fprintf (stderr, "\n");
582#endif
583    tr_free( peers );
584
585    *pieceCount = poolSize;
586    return pool;
587}
588
589static uint64_t*
590getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
591{
592    uint32_t i;
593    uint32_t pieceCount;
594    uint32_t * pieces;
595    uint64_t *req, *unreq, *ret, *walk;
596    int reqCount, unreqCount;
597    const tr_torrent * tor = t->tor;
598
599    assert( torrentIsLocked( t ) );
600
601    pieces = getPreferredPieces( t, &pieceCount );
602/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
603
604    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
605    reqCount = 0;
606    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
607    unreqCount = 0;
608
609    for( i=0; i<pieceCount; ++i ) {
610        const uint32_t index = pieces[i];
611        const int begin = tr_torPieceFirstBlock( tor, index );
612        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
613        int block;
614        for( block=begin; block<end; ++block )
615            if( tr_cpBlockIsComplete( tor->completion, block ) )
616                continue;
617            else if( tr_bitfieldHas( t->requested, block ) )
618                req[reqCount++] = block;
619            else
620                unreq[unreqCount++] = block;
621    }
622
623/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
624    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
625    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
626    walk += unreqCount;
627    memcpy( walk, req, sizeof(uint64_t) * reqCount );
628    walk += reqCount;
629    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
630    *setmeCount = walk - ret;
631
632    tr_free( req );
633    tr_free( unreq );
634    tr_free( pieces );
635
636    return ret;
637}
638
639static int
640refillPulse( void * vtorrent )
641{
642    Torrent * t = vtorrent;
643    tr_torrent * tor = t->tor;
644    uint32_t i;
645    int peerCount;
646    tr_peer ** peers;
647    uint64_t blockCount;
648    uint64_t * blocks;
649
650    if( !t->isRunning )
651        return TRUE;
652    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
653        return TRUE;
654
655    torrentLock( t );
656
657    blocks = getPreferredBlocks( t, &blockCount );
658    peers = getConnectedPeers( t, &peerCount );
659
660/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
661
662    for( i=0; peerCount && i<blockCount; ++i )
663    {
664        const int block = blocks[i];
665        const uint32_t index = tr_torBlockPiece( tor, block );
666        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
667        const uint32_t length = tr_torBlockCountBytes( tor, block );
668        int j;
669        assert( _tr_block( tor, index, begin ) == block );
670        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
671        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
672
673
674        /* find a peer who can ask for this block */
675        for( j=0; j<peerCount; )
676        {
677            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
678            switch( val )
679            {
680                case TR_ADDREQ_FULL: 
681                case TR_ADDREQ_CLIENT_CHOKED:
682                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
683                    break;
684
685                case TR_ADDREQ_MISSING: 
686                    ++j;
687                    break;
688
689                case TR_ADDREQ_OK:
690                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
691                    tr_bitfieldAdd( t->requested, block );
692                    j = peerCount;
693                    break;
694
695                default:
696                    assert( 0 && "unhandled value" );
697                    break;
698            }
699        }
700    }
701
702    /* cleanup */
703    tr_free( peers );
704    tr_free( blocks );
705
706    t->refillTimer = NULL;
707
708    torrentUnlock( t );
709    return FALSE;
710}
711
712static void
713broadcastClientHave( Torrent * t, uint32_t index )
714{
715    int i, size;
716    tr_peer ** peers;
717
718    assert( torrentIsLocked( t ) );
719
720    peers = getConnectedPeers( t, &size );
721    for( i=0; i<size; ++i )
722        tr_peerMsgsHave( peers[i]->msgs, index );
723    tr_free( peers );
724}
725
726static void
727broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
728{
729    int i, size;
730    tr_peer ** peers;
731
732    assert( torrentIsLocked( t ) );
733
734    peers = getConnectedPeers( t, &size );
735    for( i=0; i<size; ++i )
736        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
737    tr_free( peers );
738}
739
740/**
741***
742**/
743
744static int reconnectPulse( void * vtorrent );
745
746static void
747restartReconnectTimer( Torrent * t )
748{
749    tr_timerFree( &t->reconnectTimer );
750    if( t->isRunning )
751        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
752}
753
754static void
755reconnectNow( Torrent * t )
756{
757    reconnectPulse( t );
758    restartReconnectTimer( t );
759}
760
761static int
762reconnectSoonCB( void * vt )
763{
764    Torrent * t = vt;
765    reconnectNow( t );
766    t->reconnectSoonTimer = NULL;
767    return FALSE;
768}
769
770static void
771reconnectSoon( Torrent * t )
772{
773    if( t->reconnectSoonTimer == NULL )
774        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
775                                             reconnectSoonCB, t, SOON_MSEC );
776}
777
778/**
779***
780**/
781
782static int rechokePulse( void * vtorrent );
783
784static void
785restartChokeTimer( Torrent * t )
786{
787    tr_timerFree( &t->rechokeTimer );
788    if( t->isRunning )
789        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
790}
791
792static void
793rechokeNow( Torrent * t )
794{
795    rechokePulse( t );
796    restartChokeTimer( t );
797}
798
799static int
800rechokeSoonCB( void * vt )
801{
802    Torrent * t = vt;
803    rechokeNow( t );
804    t->rechokeSoonTimer = NULL;
805    return FALSE;
806}
807
808static void
809rechokeSoon( Torrent * t )
810{
811    if( t->rechokeSoonTimer == NULL )
812        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
813                                           rechokeSoonCB, t, SOON_MSEC );
814}
815
816static void
817msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
818{
819    tr_peer * peer = vpeer;
820    Torrent * t = (Torrent *) vt;
821    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
822    const int needLock = !torrentIsLocked( t );
823
824    if( needLock )
825        torrentLock( t );
826
827    switch( e->eventType )
828    {
829        case TR_PEERMSG_NEED_REQ:
830            if( t->refillTimer == NULL )
831                t->refillTimer = tr_timerNew( t->manager->handle,
832                                              refillPulse, t,
833                                              REFILL_PERIOD_MSEC );
834            break;
835
836        case TR_PEERMSG_CLIENT_HAVE:
837            broadcastClientHave( t, e->pieceIndex );
838            tr_torrentRecheckCompleteness( t->tor );
839            break;
840
841        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
842#if 0
843            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
844            const int peerIsSeed = e->progress >= 1.0;
845            if( clientIsSeed && peerIsSeed )
846                peer->doPurge = 1;
847#endif
848            break;
849        }
850
851        case TR_PEERMSG_CLIENT_BLOCK:
852            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
853            break;
854
855        case TR_PEERMSG_GOT_ERROR:
856            peer->doPurge = 1;
857            reconnectSoon( t );
858            break;
859
860        default:
861            assert(0);
862    }
863
864    if( needLock )
865        torrentUnlock( t );
866}
867
868static void
869ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
870{
871    if( !peerIsKnown( t, addr ) )
872    {
873        struct peer_atom * a = tr_new( struct peer_atom, 1 );
874        a->addr = *addr;
875        a->port = port;
876        a->flags = flags;
877        a->from = from;
878        a->time = 0;
879fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
880        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
881    }
882}
883
884static void
885myHandshakeDoneCB( tr_handshake    * handshake,
886                   tr_peerIo       * io,
887                   int               isConnected,
888                   const uint8_t   * peer_id,
889                   void            * vmanager )
890{
891    int ok = isConnected;
892    uint16_t port;
893    const struct in_addr * in_addr;
894    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
895    const uint8_t * hash = NULL;
896    Torrent * t;
897    tr_handshake * ours;
898
899    assert( io != NULL );
900    assert( isConnected==0 || isConnected==1 );
901
902    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
903                                    handshake,
904                                    handshakeCompare );
905    assert( ours != NULL );
906    assert( ours == handshake );
907
908    in_addr = tr_peerIoGetAddress( io, &port );
909
910    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
911    {
912        tr_peerIoFree( io );
913        --manager->connectionCount;
914        return;
915    }
916
917    hash = tr_peerIoGetTorrentHash( io );
918    t = getExistingTorrent( manager, hash );
919
920    if( t != NULL )
921        torrentLock( t );
922
923    if( !t || !t->isRunning )
924    {
925        tr_peerIoFree( io );
926        --manager->connectionCount;
927    }
928    else if( !ok )
929    {
930        /* if we couldn't connect or were snubbed,
931         * the peer's probably not worth remembering. */
932        tr_peer * peer = getExistingPeer( t, in_addr );
933        tr_peerIoFree( io );
934        --manager->connectionCount;
935        if( peer )
936            peer->doPurge = 1;
937    }
938    else /* looking good */
939    {
940        tr_peer * peer = getPeer( t, in_addr );
941        uint16_t port;
942        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
943        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
944        if( peer->msgs != NULL ) { /* we already have this peer */
945            tr_peerIoFree( io );
946            --manager->connectionCount;
947        } else {
948            peer->port = port;
949            peer->io = io;
950            peer->msgs = tr_peerMsgsNew( t->tor, peer );
951            tr_free( peer->client );
952            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
953            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
954            rechokeSoon( t );
955        }
956    }
957
958    if( t != NULL )
959        torrentUnlock( t );
960}
961
962static void
963initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
964{
965    tr_handshake * handshake;
966
967    assert( manager->lockThread!=0 );
968    assert( io != NULL );
969
970    handshake = tr_handshakeNew( io,
971                                 manager->handle->encryptionMode,
972                                 myHandshakeDoneCB,
973                                 manager );
974    ++manager->connectionCount;
975
976    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
977}
978
979void
980tr_peerMgrAddIncoming( tr_peerMgr      * manager,
981                       struct in_addr  * addr,
982                       uint16_t          port,
983                       int               socket )
984{
985    managerLock( manager );
986
987    if( getExistingHandshake( manager, addr ) == NULL )
988    {
989        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
990        initiateHandshake( manager, io );
991    }
992
993    managerUnlock( manager );
994}
995
996void
997tr_peerMgrAddPex( tr_peerMgr     * manager,
998                  const uint8_t  * torrentHash,
999                  int              from,
1000                  const tr_pex   * pex,
1001                  int              pexCount )
1002{
1003    Torrent * t;
1004    const tr_pex * end;
1005
1006    managerLock( manager );
1007
1008    t = getExistingTorrent( manager, torrentHash );
1009    for( end=pex+pexCount; pex!=end; ++pex )
1010        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1011    reconnectSoon( t );
1012
1013    managerUnlock( manager );
1014}
1015
1016void
1017tr_peerMgrAddPeers( tr_peerMgr    * manager,
1018                    const uint8_t * torrentHash,
1019                    int             from,
1020                    const uint8_t * peerCompact,
1021                    int             peerCount )
1022{
1023    int i;
1024    const uint8_t * walk = peerCompact;
1025    Torrent * t;
1026
1027    managerLock( manager );
1028
1029    t = getExistingTorrent( manager, torrentHash );
1030    for( i=0; t!=NULL && i<peerCount; ++i )
1031    {
1032        struct in_addr addr;
1033        uint16_t port;
1034        memcpy( &addr, walk, 4 ); walk += 4;
1035        memcpy( &port, walk, 2 ); walk += 2;
1036        ensureAtomExists( t, &addr, port, 0, from );
1037    }
1038    reconnectSoon( t );
1039
1040    managerUnlock( manager );
1041}
1042
1043/**
1044***
1045**/
1046
1047int
1048tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1049{
1050    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1051}
1052
1053void
1054tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1055                    const uint8_t  * torrentHash UNUSED,
1056                    int              pieceIndex UNUSED,
1057                    int              success UNUSED )
1058{
1059    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1060}
1061
1062int
1063tr_pexCompare( const void * va, const void * vb )
1064{
1065    const tr_pex * a = (const tr_pex *) va;
1066    const tr_pex * b = (const tr_pex *) vb;
1067    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1068    if( i ) return i;
1069    if( a->port < b->port ) return -1;
1070    if( a->port > b->port ) return 1;
1071    return 0;
1072}
1073
1074int tr_pexCompare( const void * a, const void * b );
1075
1076static int
1077peerPrefersCrypto( const tr_peer * peer )
1078{
1079    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1080        return TRUE;
1081
1082    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1083        return FALSE;
1084
1085    return tr_peerIoIsEncrypted( peer->io );
1086};
1087
1088int
1089tr_peerMgrGetPeers( tr_peerMgr      * manager,
1090                    const uint8_t   * torrentHash,
1091                    tr_pex         ** setme_pex )
1092{
1093    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1094    const int isLocked = torrentIsLocked( t );
1095    int i, peerCount;
1096    const tr_peer ** peers;
1097    tr_pex * pex;
1098    tr_pex * walk;
1099
1100    if( !isLocked )
1101        torrentLock( (Torrent*)t );
1102
1103    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1104    pex = walk = tr_new( tr_pex, peerCount );
1105
1106    for( i=0; i<peerCount; ++i, ++walk )
1107    {
1108        const tr_peer * peer = peers[i];
1109
1110        walk->in_addr = peer->in_addr;
1111
1112        walk->port = peer->port;
1113
1114        walk->flags = 0;
1115        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1116        if( peer->progress >= 1.0 )    walk->flags |= 2;
1117    }
1118
1119    assert( ( walk - pex ) == peerCount );
1120    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1121    *setme_pex = pex;
1122
1123    if( !isLocked )
1124        torrentUnlock( (Torrent*)t );
1125
1126    return peerCount;
1127}
1128
1129void
1130tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1131                        const uint8_t  * torrentHash )
1132{
1133    Torrent * t;
1134
1135    managerLock( manager );
1136
1137    t = getExistingTorrent( manager, torrentHash );
1138    t->isRunning = 1;
1139    restartChokeTimer( t );
1140    reconnectSoon( t );
1141
1142    managerUnlock( manager );
1143}
1144
1145static void
1146stopTorrent( Torrent * t )
1147{
1148    int i, size;
1149    tr_peer ** peers;
1150
1151    assert( torrentIsLocked( t ) );
1152
1153    t->isRunning = 0;
1154    tr_timerFree( &t->rechokeTimer );
1155    tr_timerFree( &t->reconnectTimer );
1156
1157    peers = getConnectedPeers( t, &size );
1158    for( i=0; i<size; ++i )
1159        disconnectPeer( peers[i] );
1160
1161    tr_free( peers );
1162}
1163void
1164tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1165                       const uint8_t  * torrentHash)
1166{
1167    managerLock( manager );
1168
1169    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1170
1171    managerUnlock( manager );
1172}
1173
1174void
1175tr_peerMgrAddTorrent( tr_peerMgr * manager,
1176                      tr_torrent * tor )
1177{
1178    Torrent * t;
1179
1180    managerLock( manager );
1181
1182    assert( tor != NULL );
1183    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1184
1185    t = tr_new0( Torrent, 1 );
1186    t->manager = manager;
1187    t->tor = tor;
1188    t->pool = tr_ptrArrayNew( );
1189    t->peers = tr_ptrArrayNew( );
1190    t->requested = tr_bitfieldNew( tor->blockCount );
1191    restartChokeTimer( t );
1192    restartReconnectTimer( t );
1193
1194    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1195    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1196
1197    managerUnlock( manager );
1198}
1199
1200void
1201tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1202                         const uint8_t  * torrentHash )
1203{
1204    Torrent * t;
1205
1206    managerLock( manager );
1207
1208    t = getExistingTorrent( manager, torrentHash );
1209    assert( t != NULL );
1210    stopTorrent( t );
1211    freeTorrent( manager, t );
1212
1213    managerUnlock( manager );
1214}
1215
1216void
1217tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1218                               const uint8_t    * torrentHash,
1219                               int8_t           * tab,
1220                               int                tabCount )
1221{
1222    int i;
1223    const Torrent * t;
1224    const tr_torrent * tor;
1225    float interval;
1226
1227    managerLock( (tr_peerMgr*)manager );
1228
1229    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1230    tor = t->tor;
1231    interval = tor->info.pieceCount / (float)tabCount;
1232
1233    memset( tab, 0, tabCount );
1234
1235    for( i=0; i<tabCount; ++i )
1236    {
1237        const int piece = i * interval;
1238
1239        if( tor == NULL )
1240            tab[i] = 0;
1241        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1242            tab[i] = -1;
1243        else {
1244            int j, peerCount;
1245            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1246            for( j=0; j<peerCount; ++j )
1247                if( tr_bitfieldHas( peers[j]->have, i ) )
1248                    ++tab[i];
1249        }
1250    }
1251
1252    managerUnlock( (tr_peerMgr*)manager );
1253}
1254
1255/* Returns the pieces that we and/or a connected peer has */
1256tr_bitfield*
1257tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1258                        const uint8_t    * torrentHash )
1259{
1260    int i, size;
1261    const Torrent * t;
1262    const tr_peer ** peers;
1263    tr_bitfield * pieces;
1264
1265    managerLock( (tr_peerMgr*)manager );
1266
1267    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1268    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1269    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1270    for( i=0; i<size; ++i )
1271        if( peers[i]->io != NULL )
1272            tr_bitfieldAnd( pieces, peers[i]->have );
1273
1274    managerUnlock( (tr_peerMgr*)manager );
1275    return pieces;
1276}
1277
1278void
1279tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1280                        const uint8_t    * torrentHash,
1281                        int              * setmePeersTotal,
1282                        int              * setmePeersConnected,
1283                        int              * setmePeersSendingToUs,
1284                        int              * setmePeersGettingFromUs,
1285                        int              * setmePeersFrom )
1286{
1287    int i, size;
1288    const Torrent * t;
1289    const tr_peer ** peers;
1290
1291    managerLock( (tr_peerMgr*)manager );
1292
1293    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1294    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1295
1296    *setmePeersTotal          = size;
1297    *setmePeersConnected      = 0;
1298    *setmePeersSendingToUs    = 0;
1299    *setmePeersGettingFromUs  = 0;
1300
1301    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1302        setmePeersFrom[i] = 0;
1303
1304    for( i=0; i<size; ++i )
1305    {
1306        const tr_peer * peer = peers[i];
1307
1308        if( peer->io == NULL ) /* not connected */
1309            continue;
1310
1311        ++*setmePeersConnected;
1312
1313        ++setmePeersFrom[peer->from];
1314
1315        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1316            ++*setmePeersGettingFromUs;
1317
1318        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1319            ++*setmePeersSendingToUs;
1320    }
1321
1322    managerUnlock( (tr_peerMgr*)manager );
1323}
1324
1325struct tr_peer_stat *
1326tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1327                     const uint8_t     * torrentHash,
1328                     int               * setmeCount UNUSED )
1329{
1330    int i, size;
1331    const Torrent * t;
1332    const tr_peer ** peers;
1333    tr_peer_stat * ret;
1334
1335    assert( manager != NULL );
1336    managerLock( (tr_peerMgr*)manager );
1337
1338    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1339    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1340
1341    ret = tr_new0( tr_peer_stat, size );
1342
1343    for( i=0; i<size; ++i )
1344    {
1345        const tr_peer * peer = peers[i];
1346        const int live = peer->io != NULL;
1347        tr_peer_stat * stat = ret + i;
1348
1349        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1350        stat->port             = peer->port;
1351        stat->from             = peer->from;
1352        stat->client           = peer->client;
1353        stat->progress         = peer->progress;
1354        stat->isConnected      = live ? 1 : 0;
1355        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1356        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1357        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1358        stat->isDownloading    = stat->uploadToRate > 0.01;
1359        stat->isUploading      = stat->downloadFromRate > 0.01;
1360    }
1361
1362    *setmeCount = size;
1363
1364    managerUnlock( (tr_peerMgr*)manager );
1365    return ret;
1366}
1367
1368/**
1369***
1370**/
1371
1372typedef struct
1373{
1374    tr_peer * peer;
1375    float rate;
1376    int randomKey;
1377    int preferred;
1378    int doUnchoke;
1379}
1380ChokeData;
1381
1382static int
1383compareChoke( const void * va, const void * vb )
1384{
1385    const ChokeData * a = ( const ChokeData * ) va;
1386    const ChokeData * b = ( const ChokeData * ) vb;
1387
1388    if( a->preferred != b->preferred )
1389        return a->preferred ? -1 : 1;
1390
1391    if( a->preferred )
1392    {
1393        if( a->rate > b->rate ) return -1;
1394        if( a->rate < b->rate ) return 1;
1395        return 0;
1396    }
1397    else
1398    {
1399        return a->randomKey - b->randomKey;
1400    }
1401}
1402
1403static int
1404clientIsSnubbedBy( const tr_peer * peer )
1405{
1406    assert( peer != NULL );
1407
1408    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1409}
1410
1411/**
1412***
1413**/
1414
1415static void
1416rechokeLeech( Torrent * t )
1417{
1418    int i, peerCount, size=0, unchoked=0;
1419    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1420    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1421    ChokeData * choke = tr_new0( ChokeData, peerCount );
1422
1423    assert( torrentIsLocked( t ) );
1424   
1425    /* sort the peers by preference and rate */
1426    for( i=0; i<peerCount; ++i )
1427    {
1428        tr_peer * peer = peers[i];
1429        ChokeData * node;
1430        if( peer->chokeChangedAt > ignorePeersNewerThan )
1431            continue;
1432
1433        node = &choke[size++];
1434        node->peer = peer;
1435        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1436        node->randomKey = tr_rand( INT_MAX );
1437        node->rate = tr_peerIoGetRateToClient( peer->io );
1438    }
1439
1440    qsort( choke, size, sizeof(ChokeData), compareChoke );
1441
1442    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1443        choke[i].doUnchoke = 1;
1444        ++unchoked;
1445    }
1446
1447    for( ; i<size; ++i ) {
1448        choke[i].doUnchoke = 1;
1449        ++unchoked;
1450        if( choke[i].peer->peerIsInterested )
1451            break;
1452    }
1453
1454    for( i=0; i<size; ++i )
1455        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1456
1457    /* cleanup */
1458    tr_free( choke );
1459    tr_free( peers );
1460}
1461
1462static void
1463rechokeSeed( Torrent * t )
1464{
1465    int i, size;
1466    tr_peer ** peers;
1467
1468    assert( torrentIsLocked( t ) );
1469
1470    peers = getConnectedPeers( t, &size );
1471
1472    /* FIXME */
1473    for( i=0; i<size; ++i )
1474        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1475
1476    tr_free( peers );
1477}
1478
1479static int
1480rechokePulse( void * vtorrent )
1481{
1482    Torrent * t = vtorrent;
1483    torrentLock( t );
1484
1485    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1486    if( done )
1487        rechokeLeech( vtorrent );
1488    else
1489        rechokeSeed( vtorrent );
1490
1491    torrentUnlock( t );
1492    return TRUE;
1493}
1494
1495/***
1496****
1497****
1498****
1499***/
1500
1501struct tr_connection
1502{
1503    tr_peer * peer;
1504    double throughput;
1505};
1506
1507#define LAISSEZ_FAIRE_PERIOD_SECS 60
1508
1509static int
1510compareConnections( const void * va, const void * vb )
1511{
1512    const struct tr_connection * a = va;
1513    const struct tr_connection * b = vb;
1514    if( a->throughput < b->throughput ) return -1;
1515    if( a->throughput > b->throughput ) return 1;
1516    return 0;
1517}
1518
1519static struct tr_connection *
1520getWeakConnections( Torrent * t, int * setmeSize )
1521{
1522    int i, insize, outsize;
1523    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1524    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1525    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1526    const time_t now = time( NULL );
1527
1528    assert( torrentIsLocked( t ) );
1529
1530    for( i=outsize=0; i<insize; ++i )
1531    {
1532        tr_peer * peer = peers[i];
1533        int isWeak;
1534        const int peerIsSeed = peer->progress >= 1.0;
1535        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1536        const double throughput = (2*tr_peerIoGetRateToPeer( peer->io ))
1537                                + tr_peerIoGetRateToClient( peer->io );
1538
1539        assert( atom != NULL );
1540
1541        /* if we're both seeds, give a little bit of time for
1542         * a mutual pex -- peer-msgs initiates a pex exchange
1543         * on startup -- and then disconnect */
1544        if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) )
1545            isWeak = TRUE;
1546        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1547            isWeak = FALSE;
1548        else if( throughput >= 5 )
1549            isWeak = FALSE;
1550        else
1551            isWeak = TRUE;
1552
1553        if( isWeak )
1554        {
1555            ret[outsize].peer = peer;
1556            ret[outsize].throughput = throughput;
1557            ++outsize;
1558        }
1559    }
1560
1561    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1562    *setmeSize = outsize;
1563    return ret;
1564}
1565
1566static int
1567compareAtomByTime( const void * va, const void * vb )
1568{
1569    const struct peer_atom * a = * (const struct peer_atom**) va;
1570    const struct peer_atom * b = * (const struct peer_atom**) vb;
1571    if( a->time < b->time ) return -1;
1572    if( a->time > b->time ) return 1;
1573    return 0;
1574}
1575
1576static struct peer_atom **
1577getPeerCandidates( Torrent * t, int * setmeSize )
1578{
1579    int i, insize, outsize;
1580    struct peer_atom ** atoms;
1581    struct peer_atom ** ret;
1582    const time_t now = time( NULL );
1583    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1584
1585    assert( torrentIsLocked( t ) );
1586
1587    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1588    ret = tr_new( struct peer_atom*, insize );
1589    for( i=outsize=0; i<insize; ++i )
1590    {
1591        struct peer_atom * atom = atoms[i];
1592
1593        /* we don't need two connections to the same peer... */
1594        if( peerIsInUse( t, &atom->addr ) ) {
1595            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1596            continue;
1597        }
1598
1599        /* no need to connect if we're both seeds... */
1600        if( seed && ( atom->flags & 2 ) ) {
1601            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1602            continue;
1603        }
1604
1605        /* if we used this peer recently, give someone else a turn */
1606        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1607            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1608            continue;
1609        }
1610
1611        ret[outsize++] = atom;
1612    }
1613
1614    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1615    *setmeSize = outsize;
1616    return ret;
1617}
1618
1619static int
1620reconnectPulse( void * vtorrent )
1621{
1622    Torrent * t = vtorrent;
1623    struct peer_atom ** candidates;
1624    struct tr_connection * connections;
1625    int i, nCandidates, nConnections, nCull, nAdd;
1626    int peerCount;
1627
1628    torrentLock( t );
1629
1630    connections = getWeakConnections( t, &nConnections );
1631    candidates = getPeerCandidates( t, &nCandidates );
1632
1633    /* figure out how many peers to disconnect */
1634    nCull = nConnections-4; 
1635
1636fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1637
1638for( i=0; i<nConnections; ++i )
1639fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1640
1641    /* disconnect some peers */
1642    for( i=0; i<nCull && i<nConnections; ++i ) {
1643        const double throughput = connections[i].throughput;
1644        tr_peer * peer = connections[i].peer;
1645        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1646        removePeer( t, peer );
1647    }
1648
1649    /* add some new ones */
1650    peerCount = tr_ptrArraySize( t->peers );
1651    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1652    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1653        struct peer_atom * atom = candidates[i];
1654        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1655fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1656        initiateHandshake( t->manager, io );
1657        atom->time = time( NULL );
1658    }
1659
1660    /* cleanup */
1661    tr_free( connections );
1662    tr_free( candidates );
1663    torrentUnlock( t );
1664    return TRUE;
1665}
Note: See TracBrowser for help on using the repository browser.