source: trunk/libtransmission/peer-mgr.c @ 3243

Last change on this file since 3243 was 3243, checked in by charles, 15 years ago

tweaks

  • Property svn:keywords set to Date Rev Author Id
File size: 43.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3243 2007-10-01 00:08:12Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "trevent.h"
33#include "utils.h"
34
35#include "pthread.h"
36
37enum
38{
39    /* how frequently to change which peers are choked */
40    RECHOKE_PERIOD_MSEC = (15 * 1000),
41
42    /* how frequently to decide which peers live and die */
43    RECONNECT_PERIOD_MSEC = (15 * 1000),
44
45    /* how frequently to refill peers' request lists */
46    REFILL_PERIOD_MSEC = 1000,
47
48    /* how many peers to unchoke per-torrent. */
49    /* FIXME: make this user-configurable? */
50    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
56    SOON_MSEC = 1000,
57
58    /* following the BT spec, we consider ourselves `snubbed' if
59     * we're we don't get piece data from a peer in this long */
60    SNUBBED_SEC = 60,
61
62    /* this is arbitrary and, hopefully, temporary until we come up
63     * with a better idea for managing the connection limits */
64    MAX_CONNECTED_PEERS_PER_TORRENT = 100,
65};
66
67/**
68***
69**/
70
71/* We keep one of these for every peer we know about, whether
72 * it's connected or not, so the struct must be small.
73 * When our current connections underperform, we dip back
74 * int this list for new ones. */
75struct peer_atom
76{   
77    uint8_t from;
78    uint8_t flags; /* these match the added_f flags */
79    uint16_t port;
80    struct in_addr addr; 
81    time_t time;
82};
83
84typedef struct
85{
86    uint8_t hash[SHA_DIGEST_LENGTH];
87    tr_ptrArray * pool; /* struct peer_atom */
88    tr_ptrArray * peers; /* tr_peer */
89    tr_timer * reconnectTimer;
90    tr_timer * reconnectSoonTimer;
91    tr_timer * rechokeTimer;
92    tr_timer * rechokeSoonTimer;
93    tr_timer * refillTimer;
94    tr_torrent * tor;
95    tr_bitfield * requested;
96
97    unsigned int isRunning : 1;
98
99    struct tr_peerMgr * manager;
100}
101Torrent;
102
103struct tr_peerMgr
104{
105    tr_handle * handle;
106    tr_ptrArray * torrents; /* Torrent */
107    int connectionCount;
108    tr_ptrArray * handshakes; /* in-process */
109    tr_lock * lock;
110    pthread_t lockThread;
111};
112
113/**
114***
115**/
116
117static void
118managerLock( struct tr_peerMgr * manager )
119{
120    assert( manager->lockThread != pthread_self() );
121    tr_lockLock( manager->lock );
122    manager->lockThread = pthread_self();
123}
124static void
125managerUnlock( struct tr_peerMgr * manager )
126{
127    assert( manager->lockThread == pthread_self() );
128    manager->lockThread = 0;
129    tr_lockUnlock( manager->lock );
130}
131static void
132torrentLock( Torrent * torrent )
133{
134    managerLock( torrent->manager );
135}
136static void
137torrentUnlock( Torrent * torrent )
138{
139    managerUnlock( torrent->manager );
140}
141static int
142torrentIsLocked( const Torrent * t )
143{
144    return t!=NULL && t->manager!=NULL && t->manager->lockThread!=0;
145}
146
147/**
148***
149**/
150
151static int
152compareAddresses( const struct in_addr * a, const struct in_addr * b )
153{
154    return tr_compareUint32( a->s_addr, b->s_addr );
155}
156
157static int
158handshakeCompareToAddr( const void * va, const void * vb )
159{
160    const tr_handshake * a = va;
161    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
162}
163
164static int
165handshakeCompare( const void * a, const void * b )
166{
167    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
168}
169
170static tr_handshake*
171getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
172{
173    return tr_ptrArrayFindSorted( mgr->handshakes,
174                                  in_addr,
175                                  handshakeCompareToAddr );
176}
177
178static int
179comparePeerAtomToAddress( const void * va, const void * vb )
180{
181    const struct peer_atom * a = va;
182    return compareAddresses( &a->addr, vb );
183}
184
185static int
186comparePeerAtoms( const void * va, const void * vb )
187{
188    const struct peer_atom * b = vb;
189    return comparePeerAtomToAddress( va, &b->addr );
190}
191
192/**
193***
194**/
195
196static int
197torrentCompare( const void * va, const void * vb )
198{
199    const Torrent * a = va;
200    const Torrent * b = vb;
201    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
202}
203
204static int
205torrentCompareToHash( const void * va, const void * vb )
206{
207    const Torrent * a = (const Torrent*) va;
208    const uint8_t * b_hash = (const uint8_t*) vb;
209    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
210}
211
212static Torrent*
213getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
214{
215    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
216                                             hash,
217                                             torrentCompareToHash );
218}
219
220static int
221peerCompare( const void * va, const void * vb )
222{
223    const tr_peer * a = (const tr_peer *) va;
224    const tr_peer * b = (const tr_peer *) vb;
225    return compareAddresses( &a->in_addr, &b->in_addr );
226}
227
228static int
229peerCompareToAddr( const void * va, const void * vb )
230{
231    const tr_peer * a = (const tr_peer *) va;
232    return compareAddresses( &a->in_addr, vb );
233}
234
235static tr_peer*
236getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
237{
238    assert( torrentIsLocked( torrent ) );
239    assert( in_addr != NULL );
240
241    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
242                                             in_addr,
243                                             peerCompareToAddr );
244}
245
246static struct peer_atom*
247getExistingAtom( const Torrent * t, const struct in_addr * addr )
248{
249    assert( torrentIsLocked( t ) );
250    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
251}
252
253static int
254peerIsKnown( const Torrent * t, const struct in_addr * addr )
255{
256    return getExistingAtom( t, addr ) != NULL;
257}
258
259static int
260peerIsInUse( const Torrent * t, const struct in_addr * addr )
261{
262    assert( torrentIsLocked ( t ) );
263
264    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
265        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
266}
267
268static tr_peer*
269getPeer( Torrent * torrent, const struct in_addr * in_addr )
270{
271    tr_peer * peer;
272
273    assert( torrentIsLocked( torrent ) );
274
275    peer = getExistingPeer( torrent, in_addr );
276
277    if( peer == NULL )
278    {
279        peer = tr_new0( tr_peer, 1 );
280        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
281        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
282    }
283
284    return peer;
285}
286
287static void
288disconnectPeer( tr_peer * peer )
289{
290    assert( peer != NULL );
291
292    tr_peerIoFree( peer->io );
293    peer->io = NULL;
294
295    if( peer->msgs != NULL )
296    {
297        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
298        tr_peerMsgsFree( peer->msgs );
299        peer->msgs = NULL;
300    }
301
302    tr_bitfieldFree( peer->have );
303    peer->have = NULL;
304
305    tr_bitfieldFree( peer->blame );
306    peer->blame = NULL;
307
308    tr_bitfieldFree( peer->banned );
309    peer->banned = NULL;
310}
311
312static void
313freePeer( tr_peer * peer )
314{
315    disconnectPeer( peer );
316    tr_free( peer->client );
317    tr_free( peer );
318}
319
320static void
321removePeer( Torrent * t, tr_peer * peer )
322{
323    tr_peer * removed;
324    struct peer_atom * atom;
325
326    assert( torrentIsLocked( t ) );
327
328    atom = getExistingAtom( t, &peer->in_addr );
329    assert( atom != NULL );
330    atom->time = time( NULL );
331
332    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
333    assert( removed == peer );
334    freePeer( removed );
335}
336
337static void
338freeTorrent( tr_peerMgr * manager, Torrent * t )
339{
340    uint8_t hash[SHA_DIGEST_LENGTH];
341
342    assert( t != NULL );
343    assert( t->peers != NULL );
344    assert( torrentIsLocked( t ) );
345    assert( getExistingTorrent( manager, t->hash ) != NULL );
346
347    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
348
349    tr_timerFree( &t->reconnectTimer );
350    tr_timerFree( &t->reconnectSoonTimer );
351    tr_timerFree( &t->rechokeTimer );
352    tr_timerFree( &t->rechokeSoonTimer );
353    tr_timerFree( &t->refillTimer );
354
355    tr_bitfieldFree( t->requested );
356    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
357    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
358    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
359    tr_free( t );
360
361    assert( getExistingTorrent( manager, hash ) == NULL );
362}
363
364/**
365***
366**/
367
368struct tr_bitfield *
369tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
370                              const uint32_t         pieceCount,
371                              const uint8_t          infohash[20],
372                              const struct in_addr * ip )
373{
374    /* This has been checked against the spec example implementation. Feeding it with :
375    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
376generate :
377    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
378    but since we're storing in a bitfield, it won't be in this order... */
379    /* TODO : We should translate link-local IPv4 adresses to external IP,
380     * so that being on same local network gives us the same allowed pieces */
381   
382    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
383    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
384            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
385            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
386            inet_ntoa( *ip ) );
387   
388    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
389    char buf[4];
390    uint32_t allowedPieceCount = 0;
391    tr_bitfield_t * ret;
392   
393    ret = tr_bitfieldNew( pieceCount );
394   
395    /* We need a seed based on most significant bytes of peer address
396        concatenated with torrent's infohash */
397    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
398   
399    memcpy( seed, &buf, 4 );
400    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
401   
402    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
403   
404    while ( allowedPieceCount < setCount )
405    {
406        int i;
407        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
408        {
409            /* We generate indices from 4-byte chunks of the seed */
410            uint32_t j = i * 4;
411            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
412            uint32_t index = y % pieceCount;
413           
414            if ( !tr_bitfieldHas( ret, index ) )
415            {
416                tr_bitfieldAdd( ret, index );
417                allowedPieceCount++;
418            }
419        }
420        /* We randomize the seed, in case we need to iterate more */
421        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
422    }
423    tr_free( seed );
424   
425    return ret;
426}
427
428tr_peerMgr*
429tr_peerMgrNew( tr_handle * handle )
430{
431    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
432    m->handle = handle;
433    m->torrents = tr_ptrArrayNew( );
434    m->handshakes = tr_ptrArrayNew( );
435    m->lock = tr_lockNew( );
436    return m;
437}
438
439void
440tr_peerMgrFree( tr_peerMgr * manager )
441{
442    managerLock( manager );
443
444    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
445    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
446
447    managerUnlock( manager );
448    tr_lockFree( manager->lock );
449    tr_free( manager );
450}
451
452static tr_peer**
453getConnectedPeers( Torrent * t, int * setmeCount )
454{
455    int i, peerCount, connectionCount;
456    tr_peer **peers;
457    tr_peer **ret;
458
459    assert( torrentIsLocked( t ) );
460
461    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
462    ret = tr_new( tr_peer*, peerCount );
463
464    for( i=connectionCount=0; i<peerCount; ++i )
465        if( peers[i]->msgs != NULL )
466            ret[connectionCount++] = peers[i];
467
468    *setmeCount = connectionCount;
469    return ret;
470}
471
472/***
473****  Refill
474***/
475
476struct tr_refill_piece
477{
478    tr_priority_t priority;
479    uint32_t piece;
480    uint32_t peerCount;
481    uint32_t fastAllowed;
482};
483
484static int
485compareRefillPiece (const void * aIn, const void * bIn)
486{
487    const struct tr_refill_piece * a = aIn;
488    const struct tr_refill_piece * b = bIn;
489
490    /* if one piece has a higher priority, it goes first */
491    if (a->priority != b->priority)
492        return a->priority > b->priority ? -1 : 1;
493
494    /* otherwise if one has fewer peers, it goes first */
495    if (a->peerCount != b->peerCount)
496        return a->peerCount < b->peerCount ? -1 : 1;
497   
498    /* otherwise if one *might be* fastallowed to us */
499    if (a->fastAllowed != b->fastAllowed)
500        return a->fastAllowed < b->fastAllowed ? -1 : 1;
501
502    /* otherwise go with the earlier piece */
503    return a->piece - b->piece;
504}
505
506static int
507isPieceInteresting( const tr_torrent  * tor,
508                    int                 piece )
509{
510    if( tor->info.pieces[piece].dnd ) /* we don't want it */
511        return 0;
512
513    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
514        return 0;
515
516    return 1;
517}
518
519static uint32_t*
520getPreferredPieces( Torrent     * t,
521                    uint32_t    * pieceCount )
522{
523    const tr_torrent * tor = t->tor;
524    const tr_info * inf = &tor->info;
525    int i;
526    uint32_t poolSize = 0;
527    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
528    int peerCount;
529    tr_peer** peers;
530
531    assert( torrentIsLocked( t ) );
532
533    peers = getConnectedPeers( t, &peerCount );
534
535    for( i=0; i<inf->pieceCount; ++i )
536        if( isPieceInteresting( tor, i ) )
537            pool[poolSize++] = i;
538
539    /* sort the pool from most interesting to least... */
540    if( poolSize > 1 )
541    {
542        uint32_t j;
543        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
544
545        for( j=0; j<poolSize; ++j )
546        {
547            int k;
548            const int piece = pool[j];
549            struct tr_refill_piece * setme = p + j;
550
551            setme->piece = piece;
552            setme->priority = inf->pieces[piece].priority;
553            setme->peerCount = 0;
554            setme->fastAllowed = 0;
555            /* FIXME */
556//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
557
558            for( k=0; k<peerCount; ++k ) {
559                const tr_peer * peer = peers[k];
560                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
561                    ++setme->peerCount;
562            }
563        }
564
565        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
566
567        for( j=0; j<poolSize; ++j )
568            pool[j] = p[j].piece;
569
570        tr_free( p );
571    }
572
573#if 0
574fprintf (stderr, "new pool: ");
575for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
576fprintf (stderr, "\n");
577#endif
578    tr_free( peers );
579
580    *pieceCount = poolSize;
581    return pool;
582}
583
584static uint64_t*
585getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
586{
587    uint32_t i;
588    uint32_t pieceCount;
589    uint32_t * pieces;
590    uint64_t *req, *unreq, *ret, *walk;
591    int reqCount, unreqCount;
592    const tr_torrent * tor = t->tor;
593
594    assert( torrentIsLocked( t ) );
595
596    pieces = getPreferredPieces( t, &pieceCount );
597/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
598
599    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
600    reqCount = 0;
601    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
602    unreqCount = 0;
603
604    for( i=0; i<pieceCount; ++i ) {
605        const uint32_t index = pieces[i];
606        const int begin = tr_torPieceFirstBlock( tor, index );
607        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
608        int block;
609        for( block=begin; block<end; ++block )
610            if( tr_cpBlockIsComplete( tor->completion, block ) )
611                continue;
612            else if( tr_bitfieldHas( t->requested, block ) )
613                req[reqCount++] = block;
614            else
615                unreq[unreqCount++] = block;
616    }
617
618/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
619    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
620    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
621    walk += unreqCount;
622    memcpy( walk, req, sizeof(uint64_t) * reqCount );
623    walk += reqCount;
624    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
625    *setmeCount = walk - ret;
626
627    tr_free( req );
628    tr_free( unreq );
629    tr_free( pieces );
630
631    return ret;
632}
633
634static int
635refillPulse( void * vtorrent )
636{
637    Torrent * t = vtorrent;
638    tr_torrent * tor = t->tor;
639    uint32_t i;
640    int peerCount;
641    tr_peer ** peers;
642    uint64_t blockCount;
643    uint64_t * blocks;
644
645    if( !t->isRunning )
646        return TRUE;
647    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
648        return TRUE;
649
650    torrentLock( t );
651
652    blocks = getPreferredBlocks( t, &blockCount );
653    peers = getConnectedPeers( t, &peerCount );
654
655/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
656
657    for( i=0; peerCount && i<blockCount; ++i )
658    {
659        const int block = blocks[i];
660        const uint32_t index = tr_torBlockPiece( tor, block );
661        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
662        const uint32_t length = tr_torBlockCountBytes( tor, block );
663        int j;
664        assert( _tr_block( tor, index, begin ) == block );
665        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
666        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
667
668
669        /* find a peer who can ask for this block */
670        for( j=0; j<peerCount; )
671        {
672            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
673            switch( val )
674            {
675                case TR_ADDREQ_FULL: 
676                case TR_ADDREQ_CLIENT_CHOKED:
677                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
678                    break;
679
680                case TR_ADDREQ_MISSING: 
681                    ++j;
682                    break;
683
684                case TR_ADDREQ_OK:
685                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
686                    tr_bitfieldAdd( t->requested, block );
687                    j = peerCount;
688                    break;
689
690                default:
691                    assert( 0 && "unhandled value" );
692                    break;
693            }
694        }
695    }
696
697    /* cleanup */
698    tr_free( peers );
699    tr_free( blocks );
700
701    t->refillTimer = NULL;
702
703    torrentUnlock( t );
704    return FALSE;
705}
706
707static void
708broadcastClientHave( Torrent * t, uint32_t index )
709{
710    int i, size;
711    tr_peer ** peers;
712
713    assert( torrentIsLocked( t ) );
714
715    peers = getConnectedPeers( t, &size );
716    for( i=0; i<size; ++i )
717        tr_peerMsgsHave( peers[i]->msgs, index );
718    tr_free( peers );
719}
720
721static void
722broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
723{
724    int i, size;
725    tr_peer ** peers;
726
727    assert( torrentIsLocked( t ) );
728
729    peers = getConnectedPeers( t, &size );
730    for( i=0; i<size; ++i )
731        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
732    tr_free( peers );
733}
734
735/**
736***
737**/
738
739static int reconnectPulse( void * vtorrent );
740
741static void
742restartReconnectTimer( Torrent * t )
743{
744    tr_timerFree( &t->reconnectTimer );
745    if( t->isRunning )
746        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
747}
748
749static void
750reconnectNow( Torrent * t )
751{
752    reconnectPulse( t );
753    restartReconnectTimer( t );
754}
755
756static int
757reconnectSoonCB( void * vt )
758{
759    Torrent * t = vt;
760    reconnectNow( t );
761    t->reconnectSoonTimer = NULL;
762    return FALSE;
763}
764
765static void
766reconnectSoon( Torrent * t )
767{
768    if( t->reconnectSoonTimer == NULL )
769        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
770                                             reconnectSoonCB, t, SOON_MSEC );
771}
772
773/**
774***
775**/
776
777static int rechokePulse( void * vtorrent );
778
779static void
780restartChokeTimer( Torrent * t )
781{
782    tr_timerFree( &t->rechokeTimer );
783    if( t->isRunning )
784        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
785}
786
787static void
788rechokeNow( Torrent * t )
789{
790    rechokePulse( t );
791    restartChokeTimer( t );
792}
793
794static int
795rechokeSoonCB( void * vt )
796{
797    Torrent * t = vt;
798    rechokeNow( t );
799    t->rechokeSoonTimer = NULL;
800    return FALSE;
801}
802
803static void
804rechokeSoon( Torrent * t )
805{
806    if( t->rechokeSoonTimer == NULL )
807        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
808                                           rechokeSoonCB, t, SOON_MSEC );
809}
810
811static void
812msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
813{
814    tr_peer * peer = vpeer;
815    Torrent * t = (Torrent *) vt;
816    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
817    const int needLock = !torrentIsLocked( t );
818
819    if( needLock )
820        torrentLock( t );
821
822    switch( e->eventType )
823    {
824        case TR_PEERMSG_NEED_REQ:
825            if( t->refillTimer == NULL )
826                t->refillTimer = tr_timerNew( t->manager->handle,
827                                              refillPulse, t,
828                                              REFILL_PERIOD_MSEC );
829            break;
830
831        case TR_PEERMSG_CLIENT_HAVE:
832            broadcastClientHave( t, e->pieceIndex );
833            tr_torrentRecheckCompleteness( t->tor );
834            break;
835
836        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
837#if 0
838            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
839            const int peerIsSeed = e->progress >= 1.0;
840            if( clientIsSeed && peerIsSeed )
841                peer->doPurge = 1;
842#endif
843            break;
844        }
845
846        case TR_PEERMSG_CLIENT_BLOCK:
847            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
848            break;
849
850        case TR_PEERMSG_GOT_ERROR:
851            peer->doPurge = 1;
852            reconnectSoon( t );
853            break;
854
855        default:
856            assert(0);
857    }
858
859    if( needLock )
860        torrentUnlock( t );
861}
862
863static void
864myHandshakeDoneCB( tr_handshake    * handshake,
865                   tr_peerIo       * io,
866                   int               isConnected,
867                   const uint8_t   * peer_id,
868                   void            * vmanager )
869{
870    int ok = isConnected;
871    uint16_t port;
872    const struct in_addr * in_addr;
873    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
874    const uint8_t * hash = NULL;
875    Torrent * t;
876    tr_handshake * ours;
877
878    assert( io != NULL );
879    assert( isConnected==0 || isConnected==1 );
880
881    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
882                                    handshake,
883                                    handshakeCompare );
884    assert( ours != NULL );
885    assert( ours == handshake );
886
887    in_addr = tr_peerIoGetAddress( io, &port );
888
889    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
890    {
891        tr_peerIoFree( io );
892        --manager->connectionCount;
893        return;
894    }
895
896    hash = tr_peerIoGetTorrentHash( io );
897    t = getExistingTorrent( manager, hash );
898
899    if( t != NULL )
900        torrentLock( t );
901
902    if( !t || !t->isRunning )
903    {
904        tr_peerIoFree( io );
905        --manager->connectionCount;
906    }
907    else if( !ok )
908    {
909        /* if we couldn't connect or were snubbed,
910         * the peer's probably not worth remembering. */
911        tr_peer * peer = getExistingPeer( t, in_addr );
912        tr_peerIoFree( io );
913        --manager->connectionCount;
914        if( peer )
915            peer->doPurge = 1;
916    }
917    else /* looking good */
918    {
919        tr_peer * peer = getPeer( t, in_addr );
920        if( peer->msgs != NULL ) { /* we already have this peer */
921            tr_peerIoFree( io );
922            --manager->connectionCount;
923        } else {
924            peer->port = port;
925            peer->io = io;
926            peer->msgs = tr_peerMsgsNew( t->tor, peer );
927            tr_free( peer->client );
928            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
929            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
930            rechokeSoon( t );
931        }
932    }
933
934    if( t != NULL )
935        torrentUnlock( t );
936}
937
938static void
939initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
940{
941    tr_handshake * handshake;
942
943    assert( manager->lockThread!=0 );
944    assert( io != NULL );
945
946    handshake = tr_handshakeNew( io,
947                                 manager->handle->encryptionMode,
948                                 myHandshakeDoneCB,
949                                 manager );
950    ++manager->connectionCount;
951
952    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
953}
954
955void
956tr_peerMgrAddIncoming( tr_peerMgr      * manager,
957                       struct in_addr  * addr,
958                       uint16_t          port,
959                       int               socket )
960{
961    managerLock( manager );
962
963    if( getExistingHandshake( manager, addr ) == NULL )
964    {
965        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
966        initiateHandshake( manager, io );
967    }
968
969    managerUnlock( manager );
970}
971
972static void
973maybeAddNewAtom( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
974{
975    if( !peerIsKnown( t, addr ) )
976    {
977        struct peer_atom * a = tr_new( struct peer_atom, 1 );
978        a->addr = *addr;
979        a->port = port;
980        a->flags = flags;
981        a->from = from;
982        a->time = 0;
983fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
984        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
985    }
986}
987
988void
989tr_peerMgrAddPex( tr_peerMgr     * manager,
990                  const uint8_t  * torrentHash,
991                  int              from,
992                  const tr_pex   * pex,
993                  int              pexCount )
994{
995    Torrent * t;
996    const tr_pex * end;
997
998    managerLock( manager );
999
1000    t = getExistingTorrent( manager, torrentHash );
1001    for( end=pex+pexCount; pex!=end; ++pex )
1002        maybeAddNewAtom( t, &pex->in_addr, pex->port, pex->flags, from );
1003    reconnectSoon( t );
1004
1005    managerUnlock( manager );
1006}
1007
1008void
1009tr_peerMgrAddPeers( tr_peerMgr    * manager,
1010                    const uint8_t * torrentHash,
1011                    int             from,
1012                    const uint8_t * peerCompact,
1013                    int             peerCount )
1014{
1015    int i;
1016    const uint8_t * walk = peerCompact;
1017    Torrent * t;
1018
1019    managerLock( manager );
1020
1021    t = getExistingTorrent( manager, torrentHash );
1022    for( i=0; t!=NULL && i<peerCount; ++i )
1023    {
1024        struct in_addr addr;
1025        uint16_t port;
1026        memcpy( &addr, walk, 4 ); walk += 4;
1027        memcpy( &port, walk, 2 ); walk += 2;
1028        maybeAddNewAtom( t, &addr, port, 0, from );
1029    }
1030    reconnectSoon( t );
1031
1032    managerUnlock( manager );
1033}
1034
1035/**
1036***
1037**/
1038
1039int
1040tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1041{
1042    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1043}
1044
1045void
1046tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1047                    const uint8_t  * torrentHash UNUSED,
1048                    int              pieceIndex UNUSED,
1049                    int              success UNUSED )
1050{
1051    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1052}
1053
1054int
1055tr_pexCompare( const void * va, const void * vb )
1056{
1057    const tr_pex * a = (const tr_pex *) va;
1058    const tr_pex * b = (const tr_pex *) vb;
1059    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1060    if( i ) return i;
1061    if( a->port < b->port ) return -1;
1062    if( a->port > b->port ) return 1;
1063    return 0;
1064}
1065
1066int tr_pexCompare( const void * a, const void * b );
1067
1068static int
1069peerPrefersCrypto( const tr_peer * peer )
1070{
1071    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1072        return TRUE;
1073
1074    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1075        return FALSE;
1076
1077    return tr_peerIoIsEncrypted( peer->io );
1078};
1079
1080int
1081tr_peerMgrGetPeers( tr_peerMgr      * manager,
1082                    const uint8_t   * torrentHash,
1083                    tr_pex         ** setme_pex )
1084{
1085    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1086    int i, peerCount;
1087    const int isLocked = torrentIsLocked( t );
1088    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1089    tr_pex * pex = tr_new( tr_pex, peerCount );
1090    tr_pex * walk = pex;
1091
1092    if( !isLocked )
1093        torrentLock( (Torrent*)t );
1094
1095    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1096    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1097    pex = walk = tr_new( tr_pex, peerCount );
1098
1099    for( i=0; i<peerCount; ++i, ++walk )
1100    {
1101        const tr_peer * peer = peers[i];
1102
1103        walk->in_addr = peer->in_addr;
1104
1105        walk->port = peer->port;
1106
1107        walk->flags = 0;
1108        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1109        if( peer->progress >= 1.0 )    walk->flags |= 2;
1110    }
1111
1112    assert( ( walk - pex ) == peerCount );
1113    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1114    *setme_pex = pex;
1115
1116    if( !isLocked )
1117        torrentUnlock( (Torrent*)t );
1118
1119    return peerCount;
1120}
1121
1122void
1123tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1124                        const uint8_t  * torrentHash )
1125{
1126    Torrent * t;
1127
1128    managerLock( manager );
1129
1130    t = getExistingTorrent( manager, torrentHash );
1131    t->isRunning = 1;
1132    restartChokeTimer( t );
1133    reconnectSoon( t );
1134
1135    managerUnlock( manager );
1136}
1137
1138static void
1139stopTorrent( Torrent * t )
1140{
1141    int i, size;
1142    tr_peer ** peers;
1143
1144    assert( torrentIsLocked( t ) );
1145
1146    t->isRunning = 0;
1147    tr_timerFree( &t->rechokeTimer );
1148    tr_timerFree( &t->reconnectTimer );
1149
1150    peers = getConnectedPeers( t, &size );
1151    for( i=0; i<size; ++i )
1152        disconnectPeer( peers[i] );
1153
1154    tr_free( peers );
1155}
1156void
1157tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1158                       const uint8_t  * torrentHash)
1159{
1160    managerLock( manager );
1161
1162    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1163
1164    managerUnlock( manager );
1165}
1166
1167void
1168tr_peerMgrAddTorrent( tr_peerMgr * manager,
1169                      tr_torrent * tor )
1170{
1171    Torrent * t;
1172
1173    managerLock( manager );
1174
1175    assert( tor != NULL );
1176    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1177
1178    t = tr_new0( Torrent, 1 );
1179    t->manager = manager;
1180    t->tor = tor;
1181    t->pool = tr_ptrArrayNew( );
1182    t->peers = tr_ptrArrayNew( );
1183    t->requested = tr_bitfieldNew( tor->blockCount );
1184    restartChokeTimer( t );
1185    restartReconnectTimer( t );
1186
1187    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1188    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1189
1190    managerUnlock( manager );
1191}
1192
1193void
1194tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1195                         const uint8_t  * torrentHash )
1196{
1197    Torrent * t;
1198
1199    managerLock( manager );
1200
1201    t = getExistingTorrent( manager, torrentHash );
1202    assert( t != NULL );
1203    stopTorrent( t );
1204    freeTorrent( manager, t );
1205
1206    managerUnlock( manager );
1207}
1208
1209void
1210tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1211                               const uint8_t    * torrentHash,
1212                               int8_t           * tab,
1213                               int                tabCount )
1214{
1215    int i;
1216    const Torrent * t;
1217    const tr_torrent * tor;
1218    float interval;
1219
1220    managerLock( (tr_peerMgr*)manager );
1221
1222    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1223    tor = t->tor;
1224    interval = tor->info.pieceCount / (float)tabCount;
1225
1226    memset( tab, 0, tabCount );
1227
1228    for( i=0; i<tabCount; ++i )
1229    {
1230        const int piece = i * interval;
1231
1232        if( tor == NULL )
1233            tab[i] = 0;
1234        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1235            tab[i] = -1;
1236        else {
1237            int j, peerCount;
1238            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1239            for( j=0; j<peerCount; ++j )
1240                if( tr_bitfieldHas( peers[j]->have, i ) )
1241                    ++tab[i];
1242        }
1243    }
1244
1245    managerUnlock( (tr_peerMgr*)manager );
1246}
1247
1248/* Returns the pieces that we and/or a connected peer has */
1249tr_bitfield*
1250tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1251                        const uint8_t    * torrentHash )
1252{
1253    int i, size;
1254    const Torrent * t;
1255    const tr_peer ** peers;
1256    tr_bitfield * pieces;
1257
1258    managerLock( (tr_peerMgr*)manager );
1259
1260    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1261    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1262    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1263    for( i=0; i<size; ++i )
1264        if( peers[i]->io != NULL )
1265            tr_bitfieldAnd( pieces, peers[i]->have );
1266
1267    managerUnlock( (tr_peerMgr*)manager );
1268    return pieces;
1269}
1270
1271void
1272tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1273                        const uint8_t    * torrentHash,
1274                        int              * setmePeersTotal,
1275                        int              * setmePeersConnected,
1276                        int              * setmePeersSendingToUs,
1277                        int              * setmePeersGettingFromUs,
1278                        int              * setmePeersFrom )
1279{
1280    int i, size;
1281    const Torrent * t;
1282    const tr_peer ** peers;
1283
1284    managerLock( (tr_peerMgr*)manager );
1285
1286    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1287    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1288
1289    *setmePeersTotal          = size;
1290    *setmePeersConnected      = 0;
1291    *setmePeersSendingToUs    = 0;
1292    *setmePeersGettingFromUs  = 0;
1293
1294    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1295        setmePeersFrom[i] = 0;
1296
1297    for( i=0; i<size; ++i )
1298    {
1299        const tr_peer * peer = peers[i];
1300
1301        if( peer->io == NULL ) /* not connected */
1302            continue;
1303
1304        ++*setmePeersConnected;
1305
1306        ++setmePeersFrom[peer->from];
1307
1308        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1309            ++*setmePeersGettingFromUs;
1310
1311        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1312            ++*setmePeersSendingToUs;
1313    }
1314
1315    managerUnlock( (tr_peerMgr*)manager );
1316}
1317
1318struct tr_peer_stat *
1319tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1320                     const uint8_t     * torrentHash,
1321                     int               * setmeCount UNUSED )
1322{
1323    int i, size;
1324    const Torrent * t;
1325    const tr_peer ** peers;
1326    tr_peer_stat * ret;
1327
1328    assert( manager != NULL );
1329    managerLock( (tr_peerMgr*)manager );
1330
1331    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1332    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1333
1334    ret = tr_new0( tr_peer_stat, size );
1335
1336    for( i=0; i<size; ++i )
1337    {
1338        const tr_peer * peer = peers[i];
1339        const int live = peer->io != NULL;
1340        tr_peer_stat * stat = ret + i;
1341
1342        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1343        stat->port             = peer->port;
1344        stat->from             = peer->from;
1345        stat->client           = peer->client;
1346        stat->progress         = peer->progress;
1347        stat->isConnected      = live ? 1 : 0;
1348        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1349        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1350        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1351        stat->isDownloading    = stat->uploadToRate > 0.01;
1352        stat->isUploading      = stat->downloadFromRate > 0.01;
1353    }
1354
1355    *setmeCount = size;
1356
1357    managerUnlock( (tr_peerMgr*)manager );
1358    return ret;
1359}
1360
1361/**
1362***
1363**/
1364
1365typedef struct
1366{
1367    tr_peer * peer;
1368    float rate;
1369    int randomKey;
1370    int preferred;
1371    int doUnchoke;
1372}
1373ChokeData;
1374
1375static int
1376compareChoke( const void * va, const void * vb )
1377{
1378    const ChokeData * a = ( const ChokeData * ) va;
1379    const ChokeData * b = ( const ChokeData * ) vb;
1380
1381    if( a->preferred != b->preferred )
1382        return a->preferred ? -1 : 1;
1383
1384    if( a->preferred )
1385    {
1386        if( a->rate > b->rate ) return -1;
1387        if( a->rate < b->rate ) return 1;
1388        return 0;
1389    }
1390    else
1391    {
1392        return a->randomKey - b->randomKey;
1393    }
1394}
1395
1396static int
1397clientIsSnubbedBy( const tr_peer * peer )
1398{
1399    assert( peer != NULL );
1400
1401    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1402}
1403
1404/**
1405***
1406**/
1407
1408static void
1409rechokeLeech( Torrent * t )
1410{
1411    int i, peerCount, size=0, unchoked=0;
1412    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1413    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1414    ChokeData * choke = tr_new0( ChokeData, peerCount );
1415
1416    assert( torrentIsLocked( t ) );
1417   
1418    /* sort the peers by preference and rate */
1419    for( i=0; i<peerCount; ++i )
1420    {
1421        tr_peer * peer = peers[i];
1422        ChokeData * node;
1423        if( peer->chokeChangedAt > ignorePeersNewerThan )
1424            continue;
1425
1426        node = &choke[size++];
1427        node->peer = peer;
1428        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1429        node->randomKey = tr_rand( INT_MAX );
1430        node->rate = tr_peerIoGetRateToClient( peer->io );
1431    }
1432
1433    qsort( choke, size, sizeof(ChokeData), compareChoke );
1434
1435    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1436        choke[i].doUnchoke = 1;
1437        ++unchoked;
1438    }
1439
1440    for( ; i<size; ++i ) {
1441        choke[i].doUnchoke = 1;
1442        ++unchoked;
1443        if( choke[i].peer->peerIsInterested )
1444            break;
1445    }
1446
1447    for( i=0; i<size; ++i )
1448        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1449
1450    /* cleanup */
1451    tr_free( choke );
1452    tr_free( peers );
1453}
1454
1455static void
1456rechokeSeed( Torrent * t )
1457{
1458    int i, size;
1459    tr_peer ** peers;
1460
1461    assert( torrentIsLocked( t ) );
1462
1463    peers = getConnectedPeers( t, &size );
1464
1465    /* FIXME */
1466    for( i=0; i<size; ++i )
1467        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1468
1469    tr_free( peers );
1470}
1471
1472static int
1473rechokePulse( void * vtorrent )
1474{
1475    Torrent * t = vtorrent;
1476    torrentLock( t );
1477
1478    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1479    if( done )
1480        rechokeLeech( vtorrent );
1481    else
1482        rechokeSeed( vtorrent );
1483
1484    torrentUnlock( t );
1485    return TRUE;
1486}
1487
1488/***
1489****
1490****
1491****
1492***/
1493
1494struct tr_connection
1495{
1496    tr_peer * peer;
1497    double throughput;
1498};
1499
1500#define LAISSEZ_FAIRE_PERIOD_SECS 60
1501
1502static int
1503compareConnections( const void * va, const void * vb )
1504{
1505    const struct tr_connection * a = va;
1506    const struct tr_connection * b = vb;
1507    if( a->throughput < b->throughput ) return -1;
1508    if( a->throughput > b->throughput ) return 1;
1509    return 0;
1510}
1511
1512static struct tr_connection *
1513getWeakConnections( Torrent * t, int * setmeSize )
1514{
1515    int i, insize, outsize;
1516    const int seeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1517    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1518    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1519    const time_t now = time( NULL );
1520
1521    assert( torrentIsLocked( t ) );
1522
1523    for( i=outsize=0; i<insize; ++i )
1524    {
1525        tr_peer * peer = peers[i];
1526        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1527        const double throughput = seeding ? tr_peerIoGetRateToPeer( peer->io )
1528                                          : tr_peerIoGetRateToClient( peer->io );
1529
1530        if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1531            continue;
1532
1533        if( throughput >= 2 )
1534            continue;
1535
1536        ret[outsize].peer = peer;
1537        ret[outsize].throughput = throughput;
1538        ++outsize;
1539    }
1540
1541    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1542    *setmeSize = outsize;
1543    return ret;
1544}
1545
1546static int
1547compareAtomByTime( const void * va, const void * vb )
1548{
1549    const struct peer_atom * a = * (const struct peer_atom**) va;
1550    const struct peer_atom * b = * (const struct peer_atom**) vb;
1551    if( a->time < b->time ) return -1;
1552    if( a->time > b->time ) return 1;
1553    return 0;
1554}
1555
1556static struct peer_atom **
1557getPeerCandidates( Torrent * t, int * setmeSize )
1558{
1559    int i, insize, outsize;
1560    struct peer_atom ** atoms;
1561    struct peer_atom ** ret;
1562    const time_t now = time( NULL );
1563    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1564
1565    assert( torrentIsLocked( t ) );
1566
1567    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1568    ret = tr_new( struct peer_atom*, insize );
1569    for( i=outsize=0; i<insize; ++i )
1570    {
1571        struct peer_atom * atom = atoms[i];
1572
1573        /* we don't need two connections to the same peer... */
1574        if( peerIsInUse( t, &atom->addr ) ) {
1575            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1576            continue;
1577        }
1578
1579        /* no need to connect if we're both seeds... */
1580        if( seed && ( atom->flags & 2 ) ) {
1581            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1582            continue;
1583        }
1584
1585        /* if we used this peer recently, give someone else a turn */
1586        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1587            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1588            continue;
1589        }
1590
1591        ret[outsize++] = atom;
1592    }
1593
1594    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1595    *setmeSize = outsize;
1596    return ret;
1597}
1598
1599static int
1600reconnectPulse( void * vtorrent )
1601{
1602    Torrent * t = vtorrent;
1603    struct peer_atom ** candidates;
1604    struct tr_connection * connections;
1605    int i, nCandidates, nConnections, nCull, nAdd;
1606
1607    torrentLock( t );
1608
1609    connections = getWeakConnections( t, &nConnections );
1610    candidates = getPeerCandidates( t, &nCandidates );
1611
1612    /* figure out how many peers to disconnect */
1613    nCull = nConnections-4; 
1614
1615fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1616
1617for( i=0; i<nConnections; ++i )
1618fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1619
1620    /* disconnect some peers */
1621    for( i=0; i<nCull && i<nConnections; ++i ) {
1622        const double throughput = connections[i].throughput;
1623        tr_peer * peer = connections[i].peer;
1624        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1625        removePeer( t, peer );
1626    }
1627
1628    /* add some new ones */
1629    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - nConnections;
1630    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1631        struct peer_atom * atom = candidates[i];
1632        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1633fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1634        initiateHandshake( t->manager, io );
1635        atom->time = time( NULL );
1636    }
1637
1638    /* cleanup */
1639    tr_free( connections );
1640    tr_free( candidates );
1641    torrentUnlock( t );
1642    return TRUE;
1643}
Note: See TracBrowser for help on using the repository browser.