source: trunk/libtransmission/peer-mgr.c @ 3247

Last change on this file since 3247 was 3247, checked in by charles, 15 years ago

fix some memory issues.

  • Property svn:keywords set to Date Rev Author Id
File size: 43.6 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3247 2007-10-01 04:12:24Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "trevent.h"
33#include "utils.h"
34
35#include "pthread.h"
36
37enum
38{
39    /* how frequently to change which peers are choked */
40    RECHOKE_PERIOD_MSEC = (15 * 1000),
41
42    /* how frequently to decide which peers live and die */
43    RECONNECT_PERIOD_MSEC = (15 * 1000),
44
45    /* how frequently to refill peers' request lists */
46    REFILL_PERIOD_MSEC = 1000,
47
48    /* how many peers to unchoke per-torrent. */
49    /* FIXME: make this user-configurable? */
50    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
56    SOON_MSEC = 1000,
57
58    /* following the BT spec, we consider ourselves `snubbed' if
59     * we're we don't get piece data from a peer in this long */
60    SNUBBED_SEC = 60,
61
62    /* this is arbitrary and, hopefully, temporary until we come up
63     * with a better idea for managing the connection limits */
64    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
65};
66
67/**
68***
69**/
70
71/* We keep one of these for every peer we know about, whether
72 * it's connected or not, so the struct must be small.
73 * When our current connections underperform, we dip back
74 * int this list for new ones. */
75struct peer_atom
76{   
77    uint8_t from;
78    uint8_t flags; /* these match the added_f flags */
79    uint16_t port;
80    struct in_addr addr; 
81    time_t time;
82};
83
84typedef struct
85{
86    uint8_t hash[SHA_DIGEST_LENGTH];
87    tr_ptrArray * pool; /* struct peer_atom */
88    tr_ptrArray * peers; /* tr_peer */
89    tr_timer * reconnectTimer;
90    tr_timer * reconnectSoonTimer;
91    tr_timer * rechokeTimer;
92    tr_timer * rechokeSoonTimer;
93    tr_timer * refillTimer;
94    tr_torrent * tor;
95    tr_bitfield * requested;
96
97    unsigned int isRunning : 1;
98
99    struct tr_peerMgr * manager;
100}
101Torrent;
102
103struct tr_peerMgr
104{
105    tr_handle * handle;
106    tr_ptrArray * torrents; /* Torrent */
107    int connectionCount;
108    tr_ptrArray * handshakes; /* in-process */
109    tr_lock * lock;
110    pthread_t lockThread;
111};
112
113/**
114***
115**/
116
117static void
118managerLock( struct tr_peerMgr * manager )
119{
120    assert( manager->lockThread != pthread_self() );
121    tr_lockLock( manager->lock );
122    manager->lockThread = pthread_self();
123}
124static void
125managerUnlock( struct tr_peerMgr * manager )
126{
127    assert( manager->lockThread == pthread_self() );
128    manager->lockThread = 0;
129    tr_lockUnlock( manager->lock );
130}
131static void
132torrentLock( Torrent * torrent )
133{
134    managerLock( torrent->manager );
135}
136static void
137torrentUnlock( Torrent * torrent )
138{
139    managerUnlock( torrent->manager );
140}
141static int
142torrentIsLocked( const Torrent * t )
143{
144    return ( t != NULL )
145        && ( t->manager != NULL )
146        && ( t->manager->lockThread != 0 )
147        && ( t->manager->lockThread == pthread_self( ) );
148}
149
150/**
151***
152**/
153
154static int
155compareAddresses( const struct in_addr * a, const struct in_addr * b )
156{
157    return tr_compareUint32( a->s_addr, b->s_addr );
158}
159
160static int
161handshakeCompareToAddr( const void * va, const void * vb )
162{
163    const tr_handshake * a = va;
164    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
165}
166
167static int
168handshakeCompare( const void * a, const void * b )
169{
170    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
171}
172
173static tr_handshake*
174getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
175{
176    return tr_ptrArrayFindSorted( mgr->handshakes,
177                                  in_addr,
178                                  handshakeCompareToAddr );
179}
180
181static int
182comparePeerAtomToAddress( const void * va, const void * vb )
183{
184    const struct peer_atom * a = va;
185    return compareAddresses( &a->addr, vb );
186}
187
188static int
189comparePeerAtoms( const void * va, const void * vb )
190{
191    const struct peer_atom * b = vb;
192    return comparePeerAtomToAddress( va, &b->addr );
193}
194
195/**
196***
197**/
198
199static int
200torrentCompare( const void * va, const void * vb )
201{
202    const Torrent * a = va;
203    const Torrent * b = vb;
204    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
205}
206
207static int
208torrentCompareToHash( const void * va, const void * vb )
209{
210    const Torrent * a = (const Torrent*) va;
211    const uint8_t * b_hash = (const uint8_t*) vb;
212    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
213}
214
215static Torrent*
216getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
217{
218    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
219                                             hash,
220                                             torrentCompareToHash );
221}
222
223static int
224peerCompare( const void * va, const void * vb )
225{
226    const tr_peer * a = (const tr_peer *) va;
227    const tr_peer * b = (const tr_peer *) vb;
228    return compareAddresses( &a->in_addr, &b->in_addr );
229}
230
231static int
232peerCompareToAddr( const void * va, const void * vb )
233{
234    const tr_peer * a = (const tr_peer *) va;
235    return compareAddresses( &a->in_addr, vb );
236}
237
238static tr_peer*
239getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
240{
241    assert( torrentIsLocked( torrent ) );
242    assert( in_addr != NULL );
243
244    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
245                                             in_addr,
246                                             peerCompareToAddr );
247}
248
249static struct peer_atom*
250getExistingAtom( const Torrent * t, const struct in_addr * addr )
251{
252    assert( torrentIsLocked( t ) );
253    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
254}
255
256static int
257peerIsKnown( const Torrent * t, const struct in_addr * addr )
258{
259    return getExistingAtom( t, addr ) != NULL;
260}
261
262static int
263peerIsInUse( const Torrent * t, const struct in_addr * addr )
264{
265    assert( torrentIsLocked ( t ) );
266
267    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
268        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
269}
270
271static tr_peer*
272getPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    tr_peer * peer;
275
276    assert( torrentIsLocked( torrent ) );
277
278    peer = getExistingPeer( torrent, in_addr );
279
280    if( peer == NULL )
281    {
282        peer = tr_new0( tr_peer, 1 );
283        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
284        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
285    }
286
287    return peer;
288}
289
290static void
291disconnectPeer( tr_peer * peer )
292{
293    assert( peer != NULL );
294
295    tr_peerIoFree( peer->io );
296    peer->io = NULL;
297
298    if( peer->msgs != NULL )
299    {
300        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
301        tr_peerMsgsFree( peer->msgs );
302        peer->msgs = NULL;
303    }
304
305    tr_bitfieldFree( peer->have );
306    peer->have = NULL;
307
308    tr_bitfieldFree( peer->blame );
309    peer->blame = NULL;
310
311    tr_bitfieldFree( peer->banned );
312    peer->banned = NULL;
313}
314
315static void
316freePeer( tr_peer * peer )
317{
318    disconnectPeer( peer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    freePeer( removed );
338}
339
340static void
341freeTorrent( tr_peerMgr * manager, Torrent * t )
342{
343    uint8_t hash[SHA_DIGEST_LENGTH];
344
345    assert( t != NULL );
346    assert( t->peers != NULL );
347    assert( torrentIsLocked( t ) );
348    assert( getExistingTorrent( manager, t->hash ) != NULL );
349
350    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
351
352    tr_timerFree( &t->reconnectTimer );
353    tr_timerFree( &t->reconnectSoonTimer );
354    tr_timerFree( &t->rechokeTimer );
355    tr_timerFree( &t->rechokeSoonTimer );
356    tr_timerFree( &t->refillTimer );
357
358    tr_bitfieldFree( t->requested );
359    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
360    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
361    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
362    tr_free( t );
363
364    assert( getExistingTorrent( manager, hash ) == NULL );
365}
366
367/**
368***
369**/
370
371struct tr_bitfield *
372tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
373                              const uint32_t         pieceCount,
374                              const uint8_t          infohash[20],
375                              const struct in_addr * ip )
376{
377    /* This has been checked against the spec example implementation. Feeding it with :
378    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
379generate :
380    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
381    but since we're storing in a bitfield, it won't be in this order... */
382    /* TODO : We should translate link-local IPv4 adresses to external IP,
383     * so that being on same local network gives us the same allowed pieces */
384   
385    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
386    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
387            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
388            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
389            inet_ntoa( *ip ) );
390   
391    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
392    char buf[4];
393    uint32_t allowedPieceCount = 0;
394    tr_bitfield_t * ret;
395   
396    ret = tr_bitfieldNew( pieceCount );
397   
398    /* We need a seed based on most significant bytes of peer address
399        concatenated with torrent's infohash */
400    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
401   
402    memcpy( seed, &buf, 4 );
403    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
404   
405    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
406   
407    while ( allowedPieceCount < setCount )
408    {
409        int i;
410        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
411        {
412            /* We generate indices from 4-byte chunks of the seed */
413            uint32_t j = i * 4;
414            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
415            uint32_t index = y % pieceCount;
416           
417            if ( !tr_bitfieldHas( ret, index ) )
418            {
419                tr_bitfieldAdd( ret, index );
420                allowedPieceCount++;
421            }
422        }
423        /* We randomize the seed, in case we need to iterate more */
424        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
425    }
426    tr_free( seed );
427   
428    return ret;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->handshakes = tr_ptrArrayNew( );
438    m->lock = tr_lockNew( );
439    return m;
440}
441
442void
443tr_peerMgrFree( tr_peerMgr * manager )
444{
445    managerLock( manager );
446
447    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
448    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
449
450    managerUnlock( manager );
451    tr_lockFree( manager->lock );
452    tr_free( manager );
453}
454
455static tr_peer**
456getConnectedPeers( Torrent * t, int * setmeCount )
457{
458    int i, peerCount, connectionCount;
459    tr_peer **peers;
460    tr_peer **ret;
461
462    assert( torrentIsLocked( t ) );
463
464    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
465    ret = tr_new( tr_peer*, peerCount );
466
467    for( i=connectionCount=0; i<peerCount; ++i )
468        if( peers[i]->msgs != NULL )
469            ret[connectionCount++] = peers[i];
470
471    *setmeCount = connectionCount;
472    return ret;
473}
474
475/***
476****  Refill
477***/
478
479struct tr_refill_piece
480{
481    tr_priority_t priority;
482    uint32_t piece;
483    uint32_t peerCount;
484    uint32_t fastAllowed;
485};
486
487static int
488compareRefillPiece (const void * aIn, const void * bIn)
489{
490    const struct tr_refill_piece * a = aIn;
491    const struct tr_refill_piece * b = bIn;
492
493    /* if one piece has a higher priority, it goes first */
494    if (a->priority != b->priority)
495        return a->priority > b->priority ? -1 : 1;
496
497    /* otherwise if one has fewer peers, it goes first */
498    if (a->peerCount != b->peerCount)
499        return a->peerCount < b->peerCount ? -1 : 1;
500   
501    /* otherwise if one *might be* fastallowed to us */
502    if (a->fastAllowed != b->fastAllowed)
503        return a->fastAllowed < b->fastAllowed ? -1 : 1;
504
505    /* otherwise go with the earlier piece */
506    return a->piece - b->piece;
507}
508
509static int
510isPieceInteresting( const tr_torrent  * tor,
511                    int                 piece )
512{
513    if( tor->info.pieces[piece].dnd ) /* we don't want it */
514        return 0;
515
516    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
517        return 0;
518
519    return 1;
520}
521
522static uint32_t*
523getPreferredPieces( Torrent     * t,
524                    uint32_t    * pieceCount )
525{
526    const tr_torrent * tor = t->tor;
527    const tr_info * inf = &tor->info;
528    int i;
529    uint32_t poolSize = 0;
530    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
531    int peerCount;
532    tr_peer** peers;
533
534    assert( torrentIsLocked( t ) );
535
536    peers = getConnectedPeers( t, &peerCount );
537
538    for( i=0; i<inf->pieceCount; ++i )
539        if( isPieceInteresting( tor, i ) )
540            pool[poolSize++] = i;
541
542    /* sort the pool from most interesting to least... */
543    if( poolSize > 1 )
544    {
545        uint32_t j;
546        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
547
548        for( j=0; j<poolSize; ++j )
549        {
550            int k;
551            const int piece = pool[j];
552            struct tr_refill_piece * setme = p + j;
553
554            setme->piece = piece;
555            setme->priority = inf->pieces[piece].priority;
556            setme->peerCount = 0;
557            setme->fastAllowed = 0;
558            /* FIXME */
559//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
560
561            for( k=0; k<peerCount; ++k ) {
562                const tr_peer * peer = peers[k];
563                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
564                    ++setme->peerCount;
565            }
566        }
567
568        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
569
570        for( j=0; j<poolSize; ++j )
571            pool[j] = p[j].piece;
572
573        tr_free( p );
574    }
575
576#if 0
577fprintf (stderr, "new pool: ");
578for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
579fprintf (stderr, "\n");
580#endif
581    tr_free( peers );
582
583    *pieceCount = poolSize;
584    return pool;
585}
586
587static uint64_t*
588getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
589{
590    uint32_t i;
591    uint32_t pieceCount;
592    uint32_t * pieces;
593    uint64_t *req, *unreq, *ret, *walk;
594    int reqCount, unreqCount;
595    const tr_torrent * tor = t->tor;
596
597    assert( torrentIsLocked( t ) );
598
599    pieces = getPreferredPieces( t, &pieceCount );
600/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
601
602    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
603    reqCount = 0;
604    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
605    unreqCount = 0;
606
607    for( i=0; i<pieceCount; ++i ) {
608        const uint32_t index = pieces[i];
609        const int begin = tr_torPieceFirstBlock( tor, index );
610        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
611        int block;
612        for( block=begin; block<end; ++block )
613            if( tr_cpBlockIsComplete( tor->completion, block ) )
614                continue;
615            else if( tr_bitfieldHas( t->requested, block ) )
616                req[reqCount++] = block;
617            else
618                unreq[unreqCount++] = block;
619    }
620
621/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
622    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
623    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
624    walk += unreqCount;
625    memcpy( walk, req, sizeof(uint64_t) * reqCount );
626    walk += reqCount;
627    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
628    *setmeCount = walk - ret;
629
630    tr_free( req );
631    tr_free( unreq );
632    tr_free( pieces );
633
634    return ret;
635}
636
637static int
638refillPulse( void * vtorrent )
639{
640    Torrent * t = vtorrent;
641    tr_torrent * tor = t->tor;
642    uint32_t i;
643    int peerCount;
644    tr_peer ** peers;
645    uint64_t blockCount;
646    uint64_t * blocks;
647
648    if( !t->isRunning )
649        return TRUE;
650    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
651        return TRUE;
652
653    torrentLock( t );
654
655    blocks = getPreferredBlocks( t, &blockCount );
656    peers = getConnectedPeers( t, &peerCount );
657
658/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
659
660    for( i=0; peerCount && i<blockCount; ++i )
661    {
662        const int block = blocks[i];
663        const uint32_t index = tr_torBlockPiece( tor, block );
664        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
665        const uint32_t length = tr_torBlockCountBytes( tor, block );
666        int j;
667        assert( _tr_block( tor, index, begin ) == block );
668        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
669        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
670
671
672        /* find a peer who can ask for this block */
673        for( j=0; j<peerCount; )
674        {
675            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
676            switch( val )
677            {
678                case TR_ADDREQ_FULL: 
679                case TR_ADDREQ_CLIENT_CHOKED:
680                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
681                    break;
682
683                case TR_ADDREQ_MISSING: 
684                    ++j;
685                    break;
686
687                case TR_ADDREQ_OK:
688                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
689                    tr_bitfieldAdd( t->requested, block );
690                    j = peerCount;
691                    break;
692
693                default:
694                    assert( 0 && "unhandled value" );
695                    break;
696            }
697        }
698    }
699
700    /* cleanup */
701    tr_free( peers );
702    tr_free( blocks );
703
704    t->refillTimer = NULL;
705
706    torrentUnlock( t );
707    return FALSE;
708}
709
710static void
711broadcastClientHave( Torrent * t, uint32_t index )
712{
713    int i, size;
714    tr_peer ** peers;
715
716    assert( torrentIsLocked( t ) );
717
718    peers = getConnectedPeers( t, &size );
719    for( i=0; i<size; ++i )
720        tr_peerMsgsHave( peers[i]->msgs, index );
721    tr_free( peers );
722}
723
724static void
725broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
726{
727    int i, size;
728    tr_peer ** peers;
729
730    assert( torrentIsLocked( t ) );
731
732    peers = getConnectedPeers( t, &size );
733    for( i=0; i<size; ++i )
734        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
735    tr_free( peers );
736}
737
738/**
739***
740**/
741
742static int reconnectPulse( void * vtorrent );
743
744static void
745restartReconnectTimer( Torrent * t )
746{
747    tr_timerFree( &t->reconnectTimer );
748    if( t->isRunning )
749        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
750}
751
752static void
753reconnectNow( Torrent * t )
754{
755    reconnectPulse( t );
756    restartReconnectTimer( t );
757}
758
759static int
760reconnectSoonCB( void * vt )
761{
762    Torrent * t = vt;
763    reconnectNow( t );
764    t->reconnectSoonTimer = NULL;
765    return FALSE;
766}
767
768static void
769reconnectSoon( Torrent * t )
770{
771    if( t->reconnectSoonTimer == NULL )
772        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
773                                             reconnectSoonCB, t, SOON_MSEC );
774}
775
776/**
777***
778**/
779
780static int rechokePulse( void * vtorrent );
781
782static void
783restartChokeTimer( Torrent * t )
784{
785    tr_timerFree( &t->rechokeTimer );
786    if( t->isRunning )
787        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
788}
789
790static void
791rechokeNow( Torrent * t )
792{
793    rechokePulse( t );
794    restartChokeTimer( t );
795}
796
797static int
798rechokeSoonCB( void * vt )
799{
800    Torrent * t = vt;
801    rechokeNow( t );
802    t->rechokeSoonTimer = NULL;
803    return FALSE;
804}
805
806static void
807rechokeSoon( Torrent * t )
808{
809    if( t->rechokeSoonTimer == NULL )
810        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
811                                           rechokeSoonCB, t, SOON_MSEC );
812}
813
814static void
815msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
816{
817    tr_peer * peer = vpeer;
818    Torrent * t = (Torrent *) vt;
819    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
820    const int needLock = !torrentIsLocked( t );
821
822    if( needLock )
823        torrentLock( t );
824
825    switch( e->eventType )
826    {
827        case TR_PEERMSG_NEED_REQ:
828            if( t->refillTimer == NULL )
829                t->refillTimer = tr_timerNew( t->manager->handle,
830                                              refillPulse, t,
831                                              REFILL_PERIOD_MSEC );
832            break;
833
834        case TR_PEERMSG_CLIENT_HAVE:
835            broadcastClientHave( t, e->pieceIndex );
836            tr_torrentRecheckCompleteness( t->tor );
837            break;
838
839        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
840#if 0
841            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
842            const int peerIsSeed = e->progress >= 1.0;
843            if( clientIsSeed && peerIsSeed )
844                peer->doPurge = 1;
845#endif
846            break;
847        }
848
849        case TR_PEERMSG_CLIENT_BLOCK:
850            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
851            break;
852
853        case TR_PEERMSG_GOT_ERROR:
854            peer->doPurge = 1;
855            reconnectSoon( t );
856            break;
857
858        default:
859            assert(0);
860    }
861
862    if( needLock )
863        torrentUnlock( t );
864}
865
866static void
867myHandshakeDoneCB( tr_handshake    * handshake,
868                   tr_peerIo       * io,
869                   int               isConnected,
870                   const uint8_t   * peer_id,
871                   void            * vmanager )
872{
873    int ok = isConnected;
874    uint16_t port;
875    const struct in_addr * in_addr;
876    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
877    const uint8_t * hash = NULL;
878    Torrent * t;
879    tr_handshake * ours;
880
881    assert( io != NULL );
882    assert( isConnected==0 || isConnected==1 );
883
884    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
885                                    handshake,
886                                    handshakeCompare );
887    assert( ours != NULL );
888    assert( ours == handshake );
889
890    in_addr = tr_peerIoGetAddress( io, &port );
891
892    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
893    {
894        tr_peerIoFree( io );
895        --manager->connectionCount;
896        return;
897    }
898
899    hash = tr_peerIoGetTorrentHash( io );
900    t = getExistingTorrent( manager, hash );
901
902    if( t != NULL )
903        torrentLock( t );
904
905    if( !t || !t->isRunning )
906    {
907        tr_peerIoFree( io );
908        --manager->connectionCount;
909    }
910    else if( !ok )
911    {
912        /* if we couldn't connect or were snubbed,
913         * the peer's probably not worth remembering. */
914        tr_peer * peer = getExistingPeer( t, in_addr );
915        tr_peerIoFree( io );
916        --manager->connectionCount;
917        if( peer )
918            peer->doPurge = 1;
919    }
920    else /* looking good */
921    {
922        tr_peer * peer = getPeer( t, in_addr );
923        if( peer->msgs != NULL ) { /* we already have this peer */
924            tr_peerIoFree( io );
925            --manager->connectionCount;
926        } else {
927            peer->port = port;
928            peer->io = io;
929            peer->msgs = tr_peerMsgsNew( t->tor, peer );
930            tr_free( peer->client );
931            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
932            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
933            rechokeSoon( t );
934        }
935    }
936
937    if( t != NULL )
938        torrentUnlock( t );
939}
940
941static void
942initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
943{
944    tr_handshake * handshake;
945
946    assert( manager->lockThread!=0 );
947    assert( io != NULL );
948
949    handshake = tr_handshakeNew( io,
950                                 manager->handle->encryptionMode,
951                                 myHandshakeDoneCB,
952                                 manager );
953    ++manager->connectionCount;
954
955    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
956}
957
958void
959tr_peerMgrAddIncoming( tr_peerMgr      * manager,
960                       struct in_addr  * addr,
961                       uint16_t          port,
962                       int               socket )
963{
964    managerLock( manager );
965
966    if( getExistingHandshake( manager, addr ) == NULL )
967    {
968        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
969        initiateHandshake( manager, io );
970    }
971
972    managerUnlock( manager );
973}
974
975static void
976maybeAddNewAtom( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
977{
978    if( !peerIsKnown( t, addr ) )
979    {
980        struct peer_atom * a = tr_new( struct peer_atom, 1 );
981        a->addr = *addr;
982        a->port = port;
983        a->flags = flags;
984        a->from = from;
985        a->time = 0;
986fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
987        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
988    }
989}
990
991void
992tr_peerMgrAddPex( tr_peerMgr     * manager,
993                  const uint8_t  * torrentHash,
994                  int              from,
995                  const tr_pex   * pex,
996                  int              pexCount )
997{
998    Torrent * t;
999    const tr_pex * end;
1000
1001    managerLock( manager );
1002
1003    t = getExistingTorrent( manager, torrentHash );
1004    for( end=pex+pexCount; pex!=end; ++pex )
1005        maybeAddNewAtom( t, &pex->in_addr, pex->port, pex->flags, from );
1006    reconnectSoon( t );
1007
1008    managerUnlock( manager );
1009}
1010
1011void
1012tr_peerMgrAddPeers( tr_peerMgr    * manager,
1013                    const uint8_t * torrentHash,
1014                    int             from,
1015                    const uint8_t * peerCompact,
1016                    int             peerCount )
1017{
1018    int i;
1019    const uint8_t * walk = peerCompact;
1020    Torrent * t;
1021
1022    managerLock( manager );
1023
1024    t = getExistingTorrent( manager, torrentHash );
1025    for( i=0; t!=NULL && i<peerCount; ++i )
1026    {
1027        struct in_addr addr;
1028        uint16_t port;
1029        memcpy( &addr, walk, 4 ); walk += 4;
1030        memcpy( &port, walk, 2 ); walk += 2;
1031        maybeAddNewAtom( t, &addr, port, 0, from );
1032    }
1033    reconnectSoon( t );
1034
1035    managerUnlock( manager );
1036}
1037
1038/**
1039***
1040**/
1041
1042int
1043tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1044{
1045    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1046}
1047
1048void
1049tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1050                    const uint8_t  * torrentHash UNUSED,
1051                    int              pieceIndex UNUSED,
1052                    int              success UNUSED )
1053{
1054    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1055}
1056
1057int
1058tr_pexCompare( const void * va, const void * vb )
1059{
1060    const tr_pex * a = (const tr_pex *) va;
1061    const tr_pex * b = (const tr_pex *) vb;
1062    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1063    if( i ) return i;
1064    if( a->port < b->port ) return -1;
1065    if( a->port > b->port ) return 1;
1066    return 0;
1067}
1068
1069int tr_pexCompare( const void * a, const void * b );
1070
1071static int
1072peerPrefersCrypto( const tr_peer * peer )
1073{
1074    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1075        return TRUE;
1076
1077    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1078        return FALSE;
1079
1080    return tr_peerIoIsEncrypted( peer->io );
1081};
1082
1083int
1084tr_peerMgrGetPeers( tr_peerMgr      * manager,
1085                    const uint8_t   * torrentHash,
1086                    tr_pex         ** setme_pex )
1087{
1088    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1089    const int isLocked = torrentIsLocked( t );
1090    int i, peerCount;
1091    const tr_peer ** peers;
1092    tr_pex * pex;
1093    tr_pex * walk;
1094
1095    if( !isLocked )
1096        torrentLock( (Torrent*)t );
1097
1098    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1099    pex = walk = tr_new( tr_pex, peerCount );
1100
1101    for( i=0; i<peerCount; ++i, ++walk )
1102    {
1103        const tr_peer * peer = peers[i];
1104
1105        walk->in_addr = peer->in_addr;
1106
1107        walk->port = peer->port;
1108
1109        walk->flags = 0;
1110        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1111        if( peer->progress >= 1.0 )    walk->flags |= 2;
1112    }
1113
1114    assert( ( walk - pex ) == peerCount );
1115    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1116    *setme_pex = pex;
1117
1118    if( !isLocked )
1119        torrentUnlock( (Torrent*)t );
1120
1121    return peerCount;
1122}
1123
1124void
1125tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1126                        const uint8_t  * torrentHash )
1127{
1128    Torrent * t;
1129
1130    managerLock( manager );
1131
1132    t = getExistingTorrent( manager, torrentHash );
1133    t->isRunning = 1;
1134    restartChokeTimer( t );
1135    reconnectSoon( t );
1136
1137    managerUnlock( manager );
1138}
1139
1140static void
1141stopTorrent( Torrent * t )
1142{
1143    int i, size;
1144    tr_peer ** peers;
1145
1146    assert( torrentIsLocked( t ) );
1147
1148    t->isRunning = 0;
1149    tr_timerFree( &t->rechokeTimer );
1150    tr_timerFree( &t->reconnectTimer );
1151
1152    peers = getConnectedPeers( t, &size );
1153    for( i=0; i<size; ++i )
1154        disconnectPeer( peers[i] );
1155
1156    tr_free( peers );
1157}
1158void
1159tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1160                       const uint8_t  * torrentHash)
1161{
1162    managerLock( manager );
1163
1164    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1165
1166    managerUnlock( manager );
1167}
1168
1169void
1170tr_peerMgrAddTorrent( tr_peerMgr * manager,
1171                      tr_torrent * tor )
1172{
1173    Torrent * t;
1174
1175    managerLock( manager );
1176
1177    assert( tor != NULL );
1178    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1179
1180    t = tr_new0( Torrent, 1 );
1181    t->manager = manager;
1182    t->tor = tor;
1183    t->pool = tr_ptrArrayNew( );
1184    t->peers = tr_ptrArrayNew( );
1185    t->requested = tr_bitfieldNew( tor->blockCount );
1186    restartChokeTimer( t );
1187    restartReconnectTimer( t );
1188
1189    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1190    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1191
1192    managerUnlock( manager );
1193}
1194
1195void
1196tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1197                         const uint8_t  * torrentHash )
1198{
1199    Torrent * t;
1200
1201    managerLock( manager );
1202
1203    t = getExistingTorrent( manager, torrentHash );
1204    assert( t != NULL );
1205    stopTorrent( t );
1206    freeTorrent( manager, t );
1207
1208    managerUnlock( manager );
1209}
1210
1211void
1212tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1213                               const uint8_t    * torrentHash,
1214                               int8_t           * tab,
1215                               int                tabCount )
1216{
1217    int i;
1218    const Torrent * t;
1219    const tr_torrent * tor;
1220    float interval;
1221
1222    managerLock( (tr_peerMgr*)manager );
1223
1224    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1225    tor = t->tor;
1226    interval = tor->info.pieceCount / (float)tabCount;
1227
1228    memset( tab, 0, tabCount );
1229
1230    for( i=0; i<tabCount; ++i )
1231    {
1232        const int piece = i * interval;
1233
1234        if( tor == NULL )
1235            tab[i] = 0;
1236        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1237            tab[i] = -1;
1238        else {
1239            int j, peerCount;
1240            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1241            for( j=0; j<peerCount; ++j )
1242                if( tr_bitfieldHas( peers[j]->have, i ) )
1243                    ++tab[i];
1244        }
1245    }
1246
1247    managerUnlock( (tr_peerMgr*)manager );
1248}
1249
1250/* Returns the pieces that we and/or a connected peer has */
1251tr_bitfield*
1252tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1253                        const uint8_t    * torrentHash )
1254{
1255    int i, size;
1256    const Torrent * t;
1257    const tr_peer ** peers;
1258    tr_bitfield * pieces;
1259
1260    managerLock( (tr_peerMgr*)manager );
1261
1262    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1263    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1264    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1265    for( i=0; i<size; ++i )
1266        if( peers[i]->io != NULL )
1267            tr_bitfieldAnd( pieces, peers[i]->have );
1268
1269    managerUnlock( (tr_peerMgr*)manager );
1270    return pieces;
1271}
1272
1273void
1274tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1275                        const uint8_t    * torrentHash,
1276                        int              * setmePeersTotal,
1277                        int              * setmePeersConnected,
1278                        int              * setmePeersSendingToUs,
1279                        int              * setmePeersGettingFromUs,
1280                        int              * setmePeersFrom )
1281{
1282    int i, size;
1283    const Torrent * t;
1284    const tr_peer ** peers;
1285
1286    managerLock( (tr_peerMgr*)manager );
1287
1288    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1289    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1290
1291    *setmePeersTotal          = size;
1292    *setmePeersConnected      = 0;
1293    *setmePeersSendingToUs    = 0;
1294    *setmePeersGettingFromUs  = 0;
1295
1296    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1297        setmePeersFrom[i] = 0;
1298
1299    for( i=0; i<size; ++i )
1300    {
1301        const tr_peer * peer = peers[i];
1302
1303        if( peer->io == NULL ) /* not connected */
1304            continue;
1305
1306        ++*setmePeersConnected;
1307
1308        ++setmePeersFrom[peer->from];
1309
1310        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1311            ++*setmePeersGettingFromUs;
1312
1313        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1314            ++*setmePeersSendingToUs;
1315    }
1316
1317    managerUnlock( (tr_peerMgr*)manager );
1318}
1319
1320struct tr_peer_stat *
1321tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1322                     const uint8_t     * torrentHash,
1323                     int               * setmeCount UNUSED )
1324{
1325    int i, size;
1326    const Torrent * t;
1327    const tr_peer ** peers;
1328    tr_peer_stat * ret;
1329
1330    assert( manager != NULL );
1331    managerLock( (tr_peerMgr*)manager );
1332
1333    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1334    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1335
1336    ret = tr_new0( tr_peer_stat, size );
1337
1338    for( i=0; i<size; ++i )
1339    {
1340        const tr_peer * peer = peers[i];
1341        const int live = peer->io != NULL;
1342        tr_peer_stat * stat = ret + i;
1343
1344        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1345        stat->port             = peer->port;
1346        stat->from             = peer->from;
1347        stat->client           = peer->client;
1348        stat->progress         = peer->progress;
1349        stat->isConnected      = live ? 1 : 0;
1350        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1351        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1352        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1353        stat->isDownloading    = stat->uploadToRate > 0.01;
1354        stat->isUploading      = stat->downloadFromRate > 0.01;
1355    }
1356
1357    *setmeCount = size;
1358
1359    managerUnlock( (tr_peerMgr*)manager );
1360    return ret;
1361}
1362
1363/**
1364***
1365**/
1366
1367typedef struct
1368{
1369    tr_peer * peer;
1370    float rate;
1371    int randomKey;
1372    int preferred;
1373    int doUnchoke;
1374}
1375ChokeData;
1376
1377static int
1378compareChoke( const void * va, const void * vb )
1379{
1380    const ChokeData * a = ( const ChokeData * ) va;
1381    const ChokeData * b = ( const ChokeData * ) vb;
1382
1383    if( a->preferred != b->preferred )
1384        return a->preferred ? -1 : 1;
1385
1386    if( a->preferred )
1387    {
1388        if( a->rate > b->rate ) return -1;
1389        if( a->rate < b->rate ) return 1;
1390        return 0;
1391    }
1392    else
1393    {
1394        return a->randomKey - b->randomKey;
1395    }
1396}
1397
1398static int
1399clientIsSnubbedBy( const tr_peer * peer )
1400{
1401    assert( peer != NULL );
1402
1403    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1404}
1405
1406/**
1407***
1408**/
1409
1410static void
1411rechokeLeech( Torrent * t )
1412{
1413    int i, peerCount, size=0, unchoked=0;
1414    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1415    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1416    ChokeData * choke = tr_new0( ChokeData, peerCount );
1417
1418    assert( torrentIsLocked( t ) );
1419   
1420    /* sort the peers by preference and rate */
1421    for( i=0; i<peerCount; ++i )
1422    {
1423        tr_peer * peer = peers[i];
1424        ChokeData * node;
1425        if( peer->chokeChangedAt > ignorePeersNewerThan )
1426            continue;
1427
1428        node = &choke[size++];
1429        node->peer = peer;
1430        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1431        node->randomKey = tr_rand( INT_MAX );
1432        node->rate = tr_peerIoGetRateToClient( peer->io );
1433    }
1434
1435    qsort( choke, size, sizeof(ChokeData), compareChoke );
1436
1437    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1438        choke[i].doUnchoke = 1;
1439        ++unchoked;
1440    }
1441
1442    for( ; i<size; ++i ) {
1443        choke[i].doUnchoke = 1;
1444        ++unchoked;
1445        if( choke[i].peer->peerIsInterested )
1446            break;
1447    }
1448
1449    for( i=0; i<size; ++i )
1450        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1451
1452    /* cleanup */
1453    tr_free( choke );
1454    tr_free( peers );
1455}
1456
1457static void
1458rechokeSeed( Torrent * t )
1459{
1460    int i, size;
1461    tr_peer ** peers;
1462
1463    assert( torrentIsLocked( t ) );
1464
1465    peers = getConnectedPeers( t, &size );
1466
1467    /* FIXME */
1468    for( i=0; i<size; ++i )
1469        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1470
1471    tr_free( peers );
1472}
1473
1474static int
1475rechokePulse( void * vtorrent )
1476{
1477    Torrent * t = vtorrent;
1478    torrentLock( t );
1479
1480    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1481    if( done )
1482        rechokeLeech( vtorrent );
1483    else
1484        rechokeSeed( vtorrent );
1485
1486    torrentUnlock( t );
1487    return TRUE;
1488}
1489
1490/***
1491****
1492****
1493****
1494***/
1495
1496struct tr_connection
1497{
1498    tr_peer * peer;
1499    double throughput;
1500};
1501
1502#define LAISSEZ_FAIRE_PERIOD_SECS 60
1503
1504static int
1505compareConnections( const void * va, const void * vb )
1506{
1507    const struct tr_connection * a = va;
1508    const struct tr_connection * b = vb;
1509    if( a->throughput < b->throughput ) return -1;
1510    if( a->throughput > b->throughput ) return 1;
1511    return 0;
1512}
1513
1514static struct tr_connection *
1515getWeakConnections( Torrent * t, int * setmeSize )
1516{
1517    int i, insize, outsize;
1518    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1519    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1520    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1521    const time_t now = time( NULL );
1522
1523    assert( torrentIsLocked( t ) );
1524
1525    for( i=outsize=0; i<insize; ++i )
1526    {
1527        tr_peer * peer = peers[i];
1528        int isWeak;
1529        const int peerIsSeed = peer->progress >= 1.0;
1530        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1531        const double throughput = (2*tr_peerIoGetRateToPeer( peer->io ))
1532                                + tr_peerIoGetRateToClient( peer->io );
1533
1534        assert( atom != NULL );
1535
1536        /* if we're both seeds, give a little bit of time for
1537         * a mutual pex -- peer-msgs initiates a pex exchange
1538         * on startup -- and then disconnect */
1539        if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) )
1540            isWeak = TRUE;
1541        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1542            isWeak = FALSE;
1543        else if( throughput >= 5 )
1544            isWeak = FALSE;
1545        else
1546            isWeak = TRUE;
1547
1548        if( isWeak )
1549        {
1550            ret[outsize].peer = peer;
1551            ret[outsize].throughput = throughput;
1552            ++outsize;
1553        }
1554    }
1555
1556    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1557    *setmeSize = outsize;
1558    return ret;
1559}
1560
1561static int
1562compareAtomByTime( const void * va, const void * vb )
1563{
1564    const struct peer_atom * a = * (const struct peer_atom**) va;
1565    const struct peer_atom * b = * (const struct peer_atom**) vb;
1566    if( a->time < b->time ) return -1;
1567    if( a->time > b->time ) return 1;
1568    return 0;
1569}
1570
1571static struct peer_atom **
1572getPeerCandidates( Torrent * t, int * setmeSize )
1573{
1574    int i, insize, outsize;
1575    struct peer_atom ** atoms;
1576    struct peer_atom ** ret;
1577    const time_t now = time( NULL );
1578    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1579
1580    assert( torrentIsLocked( t ) );
1581
1582    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1583    ret = tr_new( struct peer_atom*, insize );
1584    for( i=outsize=0; i<insize; ++i )
1585    {
1586        struct peer_atom * atom = atoms[i];
1587
1588        /* we don't need two connections to the same peer... */
1589        if( peerIsInUse( t, &atom->addr ) ) {
1590            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1591            continue;
1592        }
1593
1594        /* no need to connect if we're both seeds... */
1595        if( seed && ( atom->flags & 2 ) ) {
1596            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1597            continue;
1598        }
1599
1600        /* if we used this peer recently, give someone else a turn */
1601        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1602            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1603            continue;
1604        }
1605
1606        ret[outsize++] = atom;
1607    }
1608
1609    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1610    *setmeSize = outsize;
1611    return ret;
1612}
1613
1614static int
1615reconnectPulse( void * vtorrent )
1616{
1617    Torrent * t = vtorrent;
1618    struct peer_atom ** candidates;
1619    struct tr_connection * connections;
1620    int i, nCandidates, nConnections, nCull, nAdd;
1621    int peerCount;
1622
1623    torrentLock( t );
1624
1625    connections = getWeakConnections( t, &nConnections );
1626    candidates = getPeerCandidates( t, &nCandidates );
1627
1628    /* figure out how many peers to disconnect */
1629    nCull = nConnections-4; 
1630
1631fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1632
1633for( i=0; i<nConnections; ++i )
1634fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1635
1636    /* disconnect some peers */
1637    for( i=0; i<nCull && i<nConnections; ++i ) {
1638        const double throughput = connections[i].throughput;
1639        tr_peer * peer = connections[i].peer;
1640        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1641        removePeer( t, peer );
1642    }
1643
1644    /* add some new ones */
1645    peerCount = tr_ptrArraySize( t->peers );
1646    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1647    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1648        struct peer_atom * atom = candidates[i];
1649        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1650fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1651        initiateHandshake( t->manager, io );
1652        atom->time = time( NULL );
1653    }
1654
1655    /* cleanup */
1656    tr_free( connections );
1657    tr_free( candidates );
1658    torrentUnlock( t );
1659    return TRUE;
1660}
Note: See TracBrowser for help on using the repository browser.