source: trunk/libtransmission/peer-mgr.c @ 3248

Last change on this file since 3248 was 3248, checked in by charles, 15 years ago

ensure TR_PEER_FROM_INCOMING peers get peer atoms. this should fix BentMyWookie?'s crash.

  • Property svn:keywords set to Date Rev Author Id
File size: 43.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3248 2007-10-01 05:32:34Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "crypto.h"
23#include "handshake.h"
24#include "net.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-mgr-private.h"
28#include "peer-msgs.h"
29#include "platform.h"
30#include "ptrarray.h"
31#include "ratecontrol.h"
32#include "trevent.h"
33#include "utils.h"
34
35#include "pthread.h"
36
37enum
38{
39    /* how frequently to change which peers are choked */
40    RECHOKE_PERIOD_MSEC = (15 * 1000),
41
42    /* how frequently to decide which peers live and die */
43    RECONNECT_PERIOD_MSEC = (15 * 1000),
44
45    /* how frequently to refill peers' request lists */
46    REFILL_PERIOD_MSEC = 1000,
47
48    /* how many peers to unchoke per-torrent. */
49    /* FIXME: make this user-configurable? */
50    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
51
52    /* don't change a peer's choke status more often than this */
53    MIN_CHOKE_PERIOD_SEC = 10,
54
55    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
56    SOON_MSEC = 1000,
57
58    /* following the BT spec, we consider ourselves `snubbed' if
59     * we're we don't get piece data from a peer in this long */
60    SNUBBED_SEC = 60,
61
62    /* this is arbitrary and, hopefully, temporary until we come up
63     * with a better idea for managing the connection limits */
64    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
65};
66
67/**
68***
69**/
70
71/* We keep one of these for every peer we know about, whether
72 * it's connected or not, so the struct must be small.
73 * When our current connections underperform, we dip back
74 * int this list for new ones. */
75struct peer_atom
76{   
77    uint8_t from;
78    uint8_t flags; /* these match the added_f flags */
79    uint16_t port;
80    struct in_addr addr; 
81    time_t time;
82};
83
84typedef struct
85{
86    uint8_t hash[SHA_DIGEST_LENGTH];
87    tr_ptrArray * pool; /* struct peer_atom */
88    tr_ptrArray * peers; /* tr_peer */
89    tr_timer * reconnectTimer;
90    tr_timer * reconnectSoonTimer;
91    tr_timer * rechokeTimer;
92    tr_timer * rechokeSoonTimer;
93    tr_timer * refillTimer;
94    tr_torrent * tor;
95    tr_bitfield * requested;
96
97    unsigned int isRunning : 1;
98
99    struct tr_peerMgr * manager;
100}
101Torrent;
102
103struct tr_peerMgr
104{
105    tr_handle * handle;
106    tr_ptrArray * torrents; /* Torrent */
107    int connectionCount;
108    tr_ptrArray * handshakes; /* in-process */
109    tr_lock * lock;
110    pthread_t lockThread;
111};
112
113/**
114***
115**/
116
117static void
118managerLock( struct tr_peerMgr * manager )
119{
120    assert( manager->lockThread != pthread_self() );
121    tr_lockLock( manager->lock );
122    manager->lockThread = pthread_self();
123}
124static void
125managerUnlock( struct tr_peerMgr * manager )
126{
127    assert( manager->lockThread == pthread_self() );
128    manager->lockThread = 0;
129    tr_lockUnlock( manager->lock );
130}
131static void
132torrentLock( Torrent * torrent )
133{
134    managerLock( torrent->manager );
135}
136static void
137torrentUnlock( Torrent * torrent )
138{
139    managerUnlock( torrent->manager );
140}
141static int
142torrentIsLocked( const Torrent * t )
143{
144    return ( t != NULL )
145        && ( t->manager != NULL )
146        && ( t->manager->lockThread != 0 )
147        && ( t->manager->lockThread == pthread_self( ) );
148}
149
150/**
151***
152**/
153
154static int
155compareAddresses( const struct in_addr * a, const struct in_addr * b )
156{
157    return tr_compareUint32( a->s_addr, b->s_addr );
158}
159
160static int
161handshakeCompareToAddr( const void * va, const void * vb )
162{
163    const tr_handshake * a = va;
164    return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
165}
166
167static int
168handshakeCompare( const void * a, const void * b )
169{
170    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
171}
172
173static tr_handshake*
174getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
175{
176    return tr_ptrArrayFindSorted( mgr->handshakes,
177                                  in_addr,
178                                  handshakeCompareToAddr );
179}
180
181static int
182comparePeerAtomToAddress( const void * va, const void * vb )
183{
184    const struct peer_atom * a = va;
185    return compareAddresses( &a->addr, vb );
186}
187
188static int
189comparePeerAtoms( const void * va, const void * vb )
190{
191    const struct peer_atom * b = vb;
192    return comparePeerAtomToAddress( va, &b->addr );
193}
194
195/**
196***
197**/
198
199static int
200torrentCompare( const void * va, const void * vb )
201{
202    const Torrent * a = va;
203    const Torrent * b = vb;
204    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
205}
206
207static int
208torrentCompareToHash( const void * va, const void * vb )
209{
210    const Torrent * a = (const Torrent*) va;
211    const uint8_t * b_hash = (const uint8_t*) vb;
212    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
213}
214
215static Torrent*
216getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
217{
218    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
219                                             hash,
220                                             torrentCompareToHash );
221}
222
223static int
224peerCompare( const void * va, const void * vb )
225{
226    const tr_peer * a = (const tr_peer *) va;
227    const tr_peer * b = (const tr_peer *) vb;
228    return compareAddresses( &a->in_addr, &b->in_addr );
229}
230
231static int
232peerCompareToAddr( const void * va, const void * vb )
233{
234    const tr_peer * a = (const tr_peer *) va;
235    return compareAddresses( &a->in_addr, vb );
236}
237
238static tr_peer*
239getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
240{
241    assert( torrentIsLocked( torrent ) );
242    assert( in_addr != NULL );
243
244    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
245                                             in_addr,
246                                             peerCompareToAddr );
247}
248
249static struct peer_atom*
250getExistingAtom( const Torrent * t, const struct in_addr * addr )
251{
252    assert( torrentIsLocked( t ) );
253    return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
254}
255
256static int
257peerIsKnown( const Torrent * t, const struct in_addr * addr )
258{
259    return getExistingAtom( t, addr ) != NULL;
260}
261
262static int
263peerIsInUse( const Torrent * t, const struct in_addr * addr )
264{
265    assert( torrentIsLocked ( t ) );
266
267    return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
268        || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
269}
270
271static tr_peer*
272getPeer( Torrent * torrent, const struct in_addr * in_addr )
273{
274    tr_peer * peer;
275
276    assert( torrentIsLocked( torrent ) );
277
278    peer = getExistingPeer( torrent, in_addr );
279
280    if( peer == NULL )
281    {
282        peer = tr_new0( tr_peer, 1 );
283        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
284        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
285    }
286
287    return peer;
288}
289
290static void
291disconnectPeer( tr_peer * peer )
292{
293    assert( peer != NULL );
294
295    tr_peerIoFree( peer->io );
296    peer->io = NULL;
297
298    if( peer->msgs != NULL )
299    {
300        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
301        tr_peerMsgsFree( peer->msgs );
302        peer->msgs = NULL;
303    }
304
305    tr_bitfieldFree( peer->have );
306    peer->have = NULL;
307
308    tr_bitfieldFree( peer->blame );
309    peer->blame = NULL;
310
311    tr_bitfieldFree( peer->banned );
312    peer->banned = NULL;
313}
314
315static void
316freePeer( tr_peer * peer )
317{
318    disconnectPeer( peer );
319    tr_free( peer->client );
320    tr_free( peer );
321}
322
323static void
324removePeer( Torrent * t, tr_peer * peer )
325{
326    tr_peer * removed;
327    struct peer_atom * atom;
328
329    assert( torrentIsLocked( t ) );
330
331    atom = getExistingAtom( t, &peer->in_addr );
332    assert( atom != NULL );
333    atom->time = time( NULL );
334
335    removed = tr_ptrArrayRemoveSorted  ( t->peers, peer, peerCompare );
336    assert( removed == peer );
337    freePeer( removed );
338}
339
340static void
341freeTorrent( tr_peerMgr * manager, Torrent * t )
342{
343    uint8_t hash[SHA_DIGEST_LENGTH];
344
345    assert( t != NULL );
346    assert( t->peers != NULL );
347    assert( torrentIsLocked( t ) );
348    assert( getExistingTorrent( manager, t->hash ) != NULL );
349
350    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
351
352    tr_timerFree( &t->reconnectTimer );
353    tr_timerFree( &t->reconnectSoonTimer );
354    tr_timerFree( &t->rechokeTimer );
355    tr_timerFree( &t->rechokeSoonTimer );
356    tr_timerFree( &t->refillTimer );
357
358    tr_bitfieldFree( t->requested );
359    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
360    tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
361    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
362    tr_free( t );
363
364    assert( getExistingTorrent( manager, hash ) == NULL );
365}
366
367/**
368***
369**/
370
371struct tr_bitfield *
372tr_peerMgrGenerateAllowedSet( const uint32_t         setCount,
373                              const uint32_t         pieceCount,
374                              const uint8_t          infohash[20],
375                              const struct in_addr * ip )
376{
377    /* This has been checked against the spec example implementation. Feeding it with :
378    setCount = 9, pieceCount = 1313, infohash = Oxaa,0xaa,...0xaa, ip = 80.4.4.200
379generate :
380    1059, 431, 808, 1217, 287, 376, 1188, 353, 508
381    but since we're storing in a bitfield, it won't be in this order... */
382    /* TODO : We should translate link-local IPv4 adresses to external IP,
383     * so that being on same local network gives us the same allowed pieces */
384   
385    printf( "%d piece allowed fast set for torrent with %d pieces and hex infohash\n", setCount, pieceCount );
386    printf( "%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x%x for node with IP %s:\n",
387            infohash[0], infohash[1], infohash[2], infohash[3], infohash[4], infohash[5], infohash[6], infohash[7], infohash[8], infohash[9],
388            infohash[10], infohash[11], infohash[12], infohash[13], infohash[14], infohash[15], infohash[16], infohash[7], infohash[18], infohash[19],
389            inet_ntoa( *ip ) );
390   
391    uint8_t *seed = malloc(4 + SHA_DIGEST_LENGTH);
392    char buf[4];
393    uint32_t allowedPieceCount = 0;
394    tr_bitfield_t * ret;
395   
396    ret = tr_bitfieldNew( pieceCount );
397   
398    /* We need a seed based on most significant bytes of peer address
399        concatenated with torrent's infohash */
400    *(uint32_t*)buf = ntohl( htonl(ip->s_addr) & 0xffffff00 );
401   
402    memcpy( seed, &buf, 4 );
403    memcpy( seed + 4, infohash, SHA_DIGEST_LENGTH );
404   
405    tr_sha1( seed, seed, 4 + SHA_DIGEST_LENGTH, NULL );
406   
407    while ( allowedPieceCount < setCount )
408    {
409        int i;
410        for ( i = 0 ; i < 5 && allowedPieceCount < setCount ; i++ )
411        {
412            /* We generate indices from 4-byte chunks of the seed */
413            uint32_t j = i * 4;
414            uint32_t y = ntohl( *(uint32_t*)(seed + j) );
415            uint32_t index = y % pieceCount;
416           
417            if ( !tr_bitfieldHas( ret, index ) )
418            {
419                tr_bitfieldAdd( ret, index );
420                allowedPieceCount++;
421            }
422        }
423        /* We randomize the seed, in case we need to iterate more */
424        tr_sha1( seed, seed, SHA_DIGEST_LENGTH, NULL );
425    }
426    tr_free( seed );
427   
428    return ret;
429}
430
431tr_peerMgr*
432tr_peerMgrNew( tr_handle * handle )
433{
434    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
435    m->handle = handle;
436    m->torrents = tr_ptrArrayNew( );
437    m->handshakes = tr_ptrArrayNew( );
438    m->lock = tr_lockNew( );
439    return m;
440}
441
442void
443tr_peerMgrFree( tr_peerMgr * manager )
444{
445    managerLock( manager );
446
447    tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
448    tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
449
450    managerUnlock( manager );
451    tr_lockFree( manager->lock );
452    tr_free( manager );
453}
454
455static tr_peer**
456getConnectedPeers( Torrent * t, int * setmeCount )
457{
458    int i, peerCount, connectionCount;
459    tr_peer **peers;
460    tr_peer **ret;
461
462    assert( torrentIsLocked( t ) );
463
464    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
465    ret = tr_new( tr_peer*, peerCount );
466
467    for( i=connectionCount=0; i<peerCount; ++i )
468        if( peers[i]->msgs != NULL )
469            ret[connectionCount++] = peers[i];
470
471    *setmeCount = connectionCount;
472    return ret;
473}
474
475/***
476****  Refill
477***/
478
479struct tr_refill_piece
480{
481    tr_priority_t priority;
482    uint32_t piece;
483    uint32_t peerCount;
484    uint32_t fastAllowed;
485};
486
487static int
488compareRefillPiece (const void * aIn, const void * bIn)
489{
490    const struct tr_refill_piece * a = aIn;
491    const struct tr_refill_piece * b = bIn;
492
493    /* if one piece has a higher priority, it goes first */
494    if (a->priority != b->priority)
495        return a->priority > b->priority ? -1 : 1;
496
497    /* otherwise if one has fewer peers, it goes first */
498    if (a->peerCount != b->peerCount)
499        return a->peerCount < b->peerCount ? -1 : 1;
500   
501    /* otherwise if one *might be* fastallowed to us */
502    if (a->fastAllowed != b->fastAllowed)
503        return a->fastAllowed < b->fastAllowed ? -1 : 1;
504
505    /* otherwise go with the earlier piece */
506    return a->piece - b->piece;
507}
508
509static int
510isPieceInteresting( const tr_torrent  * tor,
511                    int                 piece )
512{
513    if( tor->info.pieces[piece].dnd ) /* we don't want it */
514        return 0;
515
516    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
517        return 0;
518
519    return 1;
520}
521
522static uint32_t*
523getPreferredPieces( Torrent     * t,
524                    uint32_t    * pieceCount )
525{
526    const tr_torrent * tor = t->tor;
527    const tr_info * inf = &tor->info;
528    int i;
529    uint32_t poolSize = 0;
530    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
531    int peerCount;
532    tr_peer** peers;
533
534    assert( torrentIsLocked( t ) );
535
536    peers = getConnectedPeers( t, &peerCount );
537
538    for( i=0; i<inf->pieceCount; ++i )
539        if( isPieceInteresting( tor, i ) )
540            pool[poolSize++] = i;
541
542    /* sort the pool from most interesting to least... */
543    if( poolSize > 1 )
544    {
545        uint32_t j;
546        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
547
548        for( j=0; j<poolSize; ++j )
549        {
550            int k;
551            const int piece = pool[j];
552            struct tr_refill_piece * setme = p + j;
553
554            setme->piece = piece;
555            setme->priority = inf->pieces[piece].priority;
556            setme->peerCount = 0;
557            setme->fastAllowed = 0;
558            /* FIXME */
559//            setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
560
561            for( k=0; k<peerCount; ++k ) {
562                const tr_peer * peer = peers[k];
563                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
564                    ++setme->peerCount;
565            }
566        }
567
568        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
569
570        for( j=0; j<poolSize; ++j )
571            pool[j] = p[j].piece;
572
573        tr_free( p );
574    }
575
576#if 0
577fprintf (stderr, "new pool: ");
578for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
579fprintf (stderr, "\n");
580#endif
581    tr_free( peers );
582
583    *pieceCount = poolSize;
584    return pool;
585}
586
587static uint64_t*
588getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
589{
590    uint32_t i;
591    uint32_t pieceCount;
592    uint32_t * pieces;
593    uint64_t *req, *unreq, *ret, *walk;
594    int reqCount, unreqCount;
595    const tr_torrent * tor = t->tor;
596
597    assert( torrentIsLocked( t ) );
598
599    pieces = getPreferredPieces( t, &pieceCount );
600/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
601
602    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
603    reqCount = 0;
604    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
605    unreqCount = 0;
606
607    for( i=0; i<pieceCount; ++i ) {
608        const uint32_t index = pieces[i];
609        const int begin = tr_torPieceFirstBlock( tor, index );
610        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
611        int block;
612        for( block=begin; block<end; ++block )
613            if( tr_cpBlockIsComplete( tor->completion, block ) )
614                continue;
615            else if( tr_bitfieldHas( t->requested, block ) )
616                req[reqCount++] = block;
617            else
618                unreq[unreqCount++] = block;
619    }
620
621/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
622    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
623    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
624    walk += unreqCount;
625    memcpy( walk, req, sizeof(uint64_t) * reqCount );
626    walk += reqCount;
627    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
628    *setmeCount = walk - ret;
629
630    tr_free( req );
631    tr_free( unreq );
632    tr_free( pieces );
633
634    return ret;
635}
636
637static int
638refillPulse( void * vtorrent )
639{
640    Torrent * t = vtorrent;
641    tr_torrent * tor = t->tor;
642    uint32_t i;
643    int peerCount;
644    tr_peer ** peers;
645    uint64_t blockCount;
646    uint64_t * blocks;
647
648    if( !t->isRunning )
649        return TRUE;
650    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
651        return TRUE;
652
653    torrentLock( t );
654
655    blocks = getPreferredBlocks( t, &blockCount );
656    peers = getConnectedPeers( t, &peerCount );
657
658/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
659
660    for( i=0; peerCount && i<blockCount; ++i )
661    {
662        const int block = blocks[i];
663        const uint32_t index = tr_torBlockPiece( tor, block );
664        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
665        const uint32_t length = tr_torBlockCountBytes( tor, block );
666        int j;
667        assert( _tr_block( tor, index, begin ) == block );
668        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
669        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
670
671
672        /* find a peer who can ask for this block */
673        for( j=0; j<peerCount; )
674        {
675            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
676            switch( val )
677            {
678                case TR_ADDREQ_FULL: 
679                case TR_ADDREQ_CLIENT_CHOKED:
680                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
681                    break;
682
683                case TR_ADDREQ_MISSING: 
684                    ++j;
685                    break;
686
687                case TR_ADDREQ_OK:
688                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
689                    tr_bitfieldAdd( t->requested, block );
690                    j = peerCount;
691                    break;
692
693                default:
694                    assert( 0 && "unhandled value" );
695                    break;
696            }
697        }
698    }
699
700    /* cleanup */
701    tr_free( peers );
702    tr_free( blocks );
703
704    t->refillTimer = NULL;
705
706    torrentUnlock( t );
707    return FALSE;
708}
709
710static void
711broadcastClientHave( Torrent * t, uint32_t index )
712{
713    int i, size;
714    tr_peer ** peers;
715
716    assert( torrentIsLocked( t ) );
717
718    peers = getConnectedPeers( t, &size );
719    for( i=0; i<size; ++i )
720        tr_peerMsgsHave( peers[i]->msgs, index );
721    tr_free( peers );
722}
723
724static void
725broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
726{
727    int i, size;
728    tr_peer ** peers;
729
730    assert( torrentIsLocked( t ) );
731
732    peers = getConnectedPeers( t, &size );
733    for( i=0; i<size; ++i )
734        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
735    tr_free( peers );
736}
737
738/**
739***
740**/
741
742static int reconnectPulse( void * vtorrent );
743
744static void
745restartReconnectTimer( Torrent * t )
746{
747    tr_timerFree( &t->reconnectTimer );
748    if( t->isRunning )
749        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
750}
751
752static void
753reconnectNow( Torrent * t )
754{
755    reconnectPulse( t );
756    restartReconnectTimer( t );
757}
758
759static int
760reconnectSoonCB( void * vt )
761{
762    Torrent * t = vt;
763    reconnectNow( t );
764    t->reconnectSoonTimer = NULL;
765    return FALSE;
766}
767
768static void
769reconnectSoon( Torrent * t )
770{
771    if( t->reconnectSoonTimer == NULL )
772        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
773                                             reconnectSoonCB, t, SOON_MSEC );
774}
775
776/**
777***
778**/
779
780static int rechokePulse( void * vtorrent );
781
782static void
783restartChokeTimer( Torrent * t )
784{
785    tr_timerFree( &t->rechokeTimer );
786    if( t->isRunning )
787        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
788}
789
790static void
791rechokeNow( Torrent * t )
792{
793    rechokePulse( t );
794    restartChokeTimer( t );
795}
796
797static int
798rechokeSoonCB( void * vt )
799{
800    Torrent * t = vt;
801    rechokeNow( t );
802    t->rechokeSoonTimer = NULL;
803    return FALSE;
804}
805
806static void
807rechokeSoon( Torrent * t )
808{
809    if( t->rechokeSoonTimer == NULL )
810        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
811                                           rechokeSoonCB, t, SOON_MSEC );
812}
813
814static void
815msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
816{
817    tr_peer * peer = vpeer;
818    Torrent * t = (Torrent *) vt;
819    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
820    const int needLock = !torrentIsLocked( t );
821
822    if( needLock )
823        torrentLock( t );
824
825    switch( e->eventType )
826    {
827        case TR_PEERMSG_NEED_REQ:
828            if( t->refillTimer == NULL )
829                t->refillTimer = tr_timerNew( t->manager->handle,
830                                              refillPulse, t,
831                                              REFILL_PERIOD_MSEC );
832            break;
833
834        case TR_PEERMSG_CLIENT_HAVE:
835            broadcastClientHave( t, e->pieceIndex );
836            tr_torrentRecheckCompleteness( t->tor );
837            break;
838
839        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
840#if 0
841            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
842            const int peerIsSeed = e->progress >= 1.0;
843            if( clientIsSeed && peerIsSeed )
844                peer->doPurge = 1;
845#endif
846            break;
847        }
848
849        case TR_PEERMSG_CLIENT_BLOCK:
850            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
851            break;
852
853        case TR_PEERMSG_GOT_ERROR:
854            peer->doPurge = 1;
855            reconnectSoon( t );
856            break;
857
858        default:
859            assert(0);
860    }
861
862    if( needLock )
863        torrentUnlock( t );
864}
865
866static void
867ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
868{
869    if( !peerIsKnown( t, addr ) )
870    {
871        struct peer_atom * a = tr_new( struct peer_atom, 1 );
872        a->addr = *addr;
873        a->port = port;
874        a->flags = flags;
875        a->from = from;
876        a->time = 0;
877fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
878        tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
879    }
880}
881
882static void
883myHandshakeDoneCB( tr_handshake    * handshake,
884                   tr_peerIo       * io,
885                   int               isConnected,
886                   const uint8_t   * peer_id,
887                   void            * vmanager )
888{
889    int ok = isConnected;
890    uint16_t port;
891    const struct in_addr * in_addr;
892    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
893    const uint8_t * hash = NULL;
894    Torrent * t;
895    tr_handshake * ours;
896
897    assert( io != NULL );
898    assert( isConnected==0 || isConnected==1 );
899
900    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
901                                    handshake,
902                                    handshakeCompare );
903    assert( ours != NULL );
904    assert( ours == handshake );
905
906    in_addr = tr_peerIoGetAddress( io, &port );
907
908    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
909    {
910        tr_peerIoFree( io );
911        --manager->connectionCount;
912        return;
913    }
914
915    hash = tr_peerIoGetTorrentHash( io );
916    t = getExistingTorrent( manager, hash );
917
918    if( t != NULL )
919        torrentLock( t );
920
921    if( !t || !t->isRunning )
922    {
923        tr_peerIoFree( io );
924        --manager->connectionCount;
925    }
926    else if( !ok )
927    {
928        /* if we couldn't connect or were snubbed,
929         * the peer's probably not worth remembering. */
930        tr_peer * peer = getExistingPeer( t, in_addr );
931        tr_peerIoFree( io );
932        --manager->connectionCount;
933        if( peer )
934            peer->doPurge = 1;
935    }
936    else /* looking good */
937    {
938        tr_peer * peer = getPeer( t, in_addr );
939        uint16_t port;
940        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
941        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
942        if( peer->msgs != NULL ) { /* we already have this peer */
943            tr_peerIoFree( io );
944            --manager->connectionCount;
945        } else {
946            peer->port = port;
947            peer->io = io;
948            peer->msgs = tr_peerMsgsNew( t->tor, peer );
949            tr_free( peer->client );
950            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
951            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
952            rechokeSoon( t );
953        }
954    }
955
956    if( t != NULL )
957        torrentUnlock( t );
958}
959
960static void
961initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
962{
963    tr_handshake * handshake;
964
965    assert( manager->lockThread!=0 );
966    assert( io != NULL );
967
968    handshake = tr_handshakeNew( io,
969                                 manager->handle->encryptionMode,
970                                 myHandshakeDoneCB,
971                                 manager );
972    ++manager->connectionCount;
973
974    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
975}
976
977void
978tr_peerMgrAddIncoming( tr_peerMgr      * manager,
979                       struct in_addr  * addr,
980                       uint16_t          port,
981                       int               socket )
982{
983    managerLock( manager );
984
985    if( getExistingHandshake( manager, addr ) == NULL )
986    {
987        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
988        initiateHandshake( manager, io );
989    }
990
991    managerUnlock( manager );
992}
993
994void
995tr_peerMgrAddPex( tr_peerMgr     * manager,
996                  const uint8_t  * torrentHash,
997                  int              from,
998                  const tr_pex   * pex,
999                  int              pexCount )
1000{
1001    Torrent * t;
1002    const tr_pex * end;
1003
1004    managerLock( manager );
1005
1006    t = getExistingTorrent( manager, torrentHash );
1007    for( end=pex+pexCount; pex!=end; ++pex )
1008        ensureAtomExists( t, &pex->in_addr, pex->port, pex->flags, from );
1009    reconnectSoon( t );
1010
1011    managerUnlock( manager );
1012}
1013
1014void
1015tr_peerMgrAddPeers( tr_peerMgr    * manager,
1016                    const uint8_t * torrentHash,
1017                    int             from,
1018                    const uint8_t * peerCompact,
1019                    int             peerCount )
1020{
1021    int i;
1022    const uint8_t * walk = peerCompact;
1023    Torrent * t;
1024
1025    managerLock( manager );
1026
1027    t = getExistingTorrent( manager, torrentHash );
1028    for( i=0; t!=NULL && i<peerCount; ++i )
1029    {
1030        struct in_addr addr;
1031        uint16_t port;
1032        memcpy( &addr, walk, 4 ); walk += 4;
1033        memcpy( &port, walk, 2 ); walk += 2;
1034        ensureAtomExists( t, &addr, port, 0, from );
1035    }
1036    reconnectSoon( t );
1037
1038    managerUnlock( manager );
1039}
1040
1041/**
1042***
1043**/
1044
1045int
1046tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
1047{
1048    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
1049}
1050
1051void
1052tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
1053                    const uint8_t  * torrentHash UNUSED,
1054                    int              pieceIndex UNUSED,
1055                    int              success UNUSED )
1056{
1057    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
1058}
1059
1060int
1061tr_pexCompare( const void * va, const void * vb )
1062{
1063    const tr_pex * a = (const tr_pex *) va;
1064    const tr_pex * b = (const tr_pex *) vb;
1065    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
1066    if( i ) return i;
1067    if( a->port < b->port ) return -1;
1068    if( a->port > b->port ) return 1;
1069    return 0;
1070}
1071
1072int tr_pexCompare( const void * a, const void * b );
1073
1074static int
1075peerPrefersCrypto( const tr_peer * peer )
1076{
1077    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1078        return TRUE;
1079
1080    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1081        return FALSE;
1082
1083    return tr_peerIoIsEncrypted( peer->io );
1084};
1085
1086int
1087tr_peerMgrGetPeers( tr_peerMgr      * manager,
1088                    const uint8_t   * torrentHash,
1089                    tr_pex         ** setme_pex )
1090{
1091    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1092    const int isLocked = torrentIsLocked( t );
1093    int i, peerCount;
1094    const tr_peer ** peers;
1095    tr_pex * pex;
1096    tr_pex * walk;
1097
1098    if( !isLocked )
1099        torrentLock( (Torrent*)t );
1100
1101    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1102    pex = walk = tr_new( tr_pex, peerCount );
1103
1104    for( i=0; i<peerCount; ++i, ++walk )
1105    {
1106        const tr_peer * peer = peers[i];
1107
1108        walk->in_addr = peer->in_addr;
1109
1110        walk->port = peer->port;
1111
1112        walk->flags = 0;
1113        if( peerPrefersCrypto(peer) )  walk->flags |= 1;
1114        if( peer->progress >= 1.0 )    walk->flags |= 2;
1115    }
1116
1117    assert( ( walk - pex ) == peerCount );
1118    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
1119    *setme_pex = pex;
1120
1121    if( !isLocked )
1122        torrentUnlock( (Torrent*)t );
1123
1124    return peerCount;
1125}
1126
1127void
1128tr_peerMgrStartTorrent( tr_peerMgr     * manager,
1129                        const uint8_t  * torrentHash )
1130{
1131    Torrent * t;
1132
1133    managerLock( manager );
1134
1135    t = getExistingTorrent( manager, torrentHash );
1136    t->isRunning = 1;
1137    restartChokeTimer( t );
1138    reconnectSoon( t );
1139
1140    managerUnlock( manager );
1141}
1142
1143static void
1144stopTorrent( Torrent * t )
1145{
1146    int i, size;
1147    tr_peer ** peers;
1148
1149    assert( torrentIsLocked( t ) );
1150
1151    t->isRunning = 0;
1152    tr_timerFree( &t->rechokeTimer );
1153    tr_timerFree( &t->reconnectTimer );
1154
1155    peers = getConnectedPeers( t, &size );
1156    for( i=0; i<size; ++i )
1157        disconnectPeer( peers[i] );
1158
1159    tr_free( peers );
1160}
1161void
1162tr_peerMgrStopTorrent( tr_peerMgr     * manager,
1163                       const uint8_t  * torrentHash)
1164{
1165    managerLock( manager );
1166
1167    stopTorrent( getExistingTorrent( manager, torrentHash ) );
1168
1169    managerUnlock( manager );
1170}
1171
1172void
1173tr_peerMgrAddTorrent( tr_peerMgr * manager,
1174                      tr_torrent * tor )
1175{
1176    Torrent * t;
1177
1178    managerLock( manager );
1179
1180    assert( tor != NULL );
1181    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
1182
1183    t = tr_new0( Torrent, 1 );
1184    t->manager = manager;
1185    t->tor = tor;
1186    t->pool = tr_ptrArrayNew( );
1187    t->peers = tr_ptrArrayNew( );
1188    t->requested = tr_bitfieldNew( tor->blockCount );
1189    restartChokeTimer( t );
1190    restartReconnectTimer( t );
1191
1192    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
1193    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
1194
1195    managerUnlock( manager );
1196}
1197
1198void
1199tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
1200                         const uint8_t  * torrentHash )
1201{
1202    Torrent * t;
1203
1204    managerLock( manager );
1205
1206    t = getExistingTorrent( manager, torrentHash );
1207    assert( t != NULL );
1208    stopTorrent( t );
1209    freeTorrent( manager, t );
1210
1211    managerUnlock( manager );
1212}
1213
1214void
1215tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
1216                               const uint8_t    * torrentHash,
1217                               int8_t           * tab,
1218                               int                tabCount )
1219{
1220    int i;
1221    const Torrent * t;
1222    const tr_torrent * tor;
1223    float interval;
1224
1225    managerLock( (tr_peerMgr*)manager );
1226
1227    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1228    tor = t->tor;
1229    interval = tor->info.pieceCount / (float)tabCount;
1230
1231    memset( tab, 0, tabCount );
1232
1233    for( i=0; i<tabCount; ++i )
1234    {
1235        const int piece = i * interval;
1236
1237        if( tor == NULL )
1238            tab[i] = 0;
1239        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
1240            tab[i] = -1;
1241        else {
1242            int j, peerCount;
1243            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
1244            for( j=0; j<peerCount; ++j )
1245                if( tr_bitfieldHas( peers[j]->have, i ) )
1246                    ++tab[i];
1247        }
1248    }
1249
1250    managerUnlock( (tr_peerMgr*)manager );
1251}
1252
1253/* Returns the pieces that we and/or a connected peer has */
1254tr_bitfield*
1255tr_peerMgrGetAvailable( const tr_peerMgr * manager,
1256                        const uint8_t    * torrentHash )
1257{
1258    int i, size;
1259    const Torrent * t;
1260    const tr_peer ** peers;
1261    tr_bitfield * pieces;
1262
1263    managerLock( (tr_peerMgr*)manager );
1264
1265    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1266    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1267    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
1268    for( i=0; i<size; ++i )
1269        if( peers[i]->io != NULL )
1270            tr_bitfieldAnd( pieces, peers[i]->have );
1271
1272    managerUnlock( (tr_peerMgr*)manager );
1273    return pieces;
1274}
1275
1276void
1277tr_peerMgrTorrentStats( const tr_peerMgr * manager,
1278                        const uint8_t    * torrentHash,
1279                        int              * setmePeersTotal,
1280                        int              * setmePeersConnected,
1281                        int              * setmePeersSendingToUs,
1282                        int              * setmePeersGettingFromUs,
1283                        int              * setmePeersFrom )
1284{
1285    int i, size;
1286    const Torrent * t;
1287    const tr_peer ** peers;
1288
1289    managerLock( (tr_peerMgr*)manager );
1290
1291    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1292    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1293
1294    *setmePeersTotal          = size;
1295    *setmePeersConnected      = 0;
1296    *setmePeersSendingToUs    = 0;
1297    *setmePeersGettingFromUs  = 0;
1298
1299    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1300        setmePeersFrom[i] = 0;
1301
1302    for( i=0; i<size; ++i )
1303    {
1304        const tr_peer * peer = peers[i];
1305
1306        if( peer->io == NULL ) /* not connected */
1307            continue;
1308
1309        ++*setmePeersConnected;
1310
1311        ++setmePeersFrom[peer->from];
1312
1313        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1314            ++*setmePeersGettingFromUs;
1315
1316        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1317            ++*setmePeersSendingToUs;
1318    }
1319
1320    managerUnlock( (tr_peerMgr*)manager );
1321}
1322
1323struct tr_peer_stat *
1324tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1325                     const uint8_t     * torrentHash,
1326                     int               * setmeCount UNUSED )
1327{
1328    int i, size;
1329    const Torrent * t;
1330    const tr_peer ** peers;
1331    tr_peer_stat * ret;
1332
1333    assert( manager != NULL );
1334    managerLock( (tr_peerMgr*)manager );
1335
1336    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1337    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1338
1339    ret = tr_new0( tr_peer_stat, size );
1340
1341    for( i=0; i<size; ++i )
1342    {
1343        const tr_peer * peer = peers[i];
1344        const int live = peer->io != NULL;
1345        tr_peer_stat * stat = ret + i;
1346
1347        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1348        stat->port             = peer->port;
1349        stat->from             = peer->from;
1350        stat->client           = peer->client;
1351        stat->progress         = peer->progress;
1352        stat->isConnected      = live ? 1 : 0;
1353        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1354        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1355        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1356        stat->isDownloading    = stat->uploadToRate > 0.01;
1357        stat->isUploading      = stat->downloadFromRate > 0.01;
1358    }
1359
1360    *setmeCount = size;
1361
1362    managerUnlock( (tr_peerMgr*)manager );
1363    return ret;
1364}
1365
1366/**
1367***
1368**/
1369
1370typedef struct
1371{
1372    tr_peer * peer;
1373    float rate;
1374    int randomKey;
1375    int preferred;
1376    int doUnchoke;
1377}
1378ChokeData;
1379
1380static int
1381compareChoke( const void * va, const void * vb )
1382{
1383    const ChokeData * a = ( const ChokeData * ) va;
1384    const ChokeData * b = ( const ChokeData * ) vb;
1385
1386    if( a->preferred != b->preferred )
1387        return a->preferred ? -1 : 1;
1388
1389    if( a->preferred )
1390    {
1391        if( a->rate > b->rate ) return -1;
1392        if( a->rate < b->rate ) return 1;
1393        return 0;
1394    }
1395    else
1396    {
1397        return a->randomKey - b->randomKey;
1398    }
1399}
1400
1401static int
1402clientIsSnubbedBy( const tr_peer * peer )
1403{
1404    assert( peer != NULL );
1405
1406    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1407}
1408
1409/**
1410***
1411**/
1412
1413static void
1414rechokeLeech( Torrent * t )
1415{
1416    int i, peerCount, size=0, unchoked=0;
1417    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1418    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1419    ChokeData * choke = tr_new0( ChokeData, peerCount );
1420
1421    assert( torrentIsLocked( t ) );
1422   
1423    /* sort the peers by preference and rate */
1424    for( i=0; i<peerCount; ++i )
1425    {
1426        tr_peer * peer = peers[i];
1427        ChokeData * node;
1428        if( peer->chokeChangedAt > ignorePeersNewerThan )
1429            continue;
1430
1431        node = &choke[size++];
1432        node->peer = peer;
1433        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1434        node->randomKey = tr_rand( INT_MAX );
1435        node->rate = tr_peerIoGetRateToClient( peer->io );
1436    }
1437
1438    qsort( choke, size, sizeof(ChokeData), compareChoke );
1439
1440    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1441        choke[i].doUnchoke = 1;
1442        ++unchoked;
1443    }
1444
1445    for( ; i<size; ++i ) {
1446        choke[i].doUnchoke = 1;
1447        ++unchoked;
1448        if( choke[i].peer->peerIsInterested )
1449            break;
1450    }
1451
1452    for( i=0; i<size; ++i )
1453        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1454
1455    /* cleanup */
1456    tr_free( choke );
1457    tr_free( peers );
1458}
1459
1460static void
1461rechokeSeed( Torrent * t )
1462{
1463    int i, size;
1464    tr_peer ** peers;
1465
1466    assert( torrentIsLocked( t ) );
1467
1468    peers = getConnectedPeers( t, &size );
1469
1470    /* FIXME */
1471    for( i=0; i<size; ++i )
1472        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1473
1474    tr_free( peers );
1475}
1476
1477static int
1478rechokePulse( void * vtorrent )
1479{
1480    Torrent * t = vtorrent;
1481    torrentLock( t );
1482
1483    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1484    if( done )
1485        rechokeLeech( vtorrent );
1486    else
1487        rechokeSeed( vtorrent );
1488
1489    torrentUnlock( t );
1490    return TRUE;
1491}
1492
1493/***
1494****
1495****
1496****
1497***/
1498
1499struct tr_connection
1500{
1501    tr_peer * peer;
1502    double throughput;
1503};
1504
1505#define LAISSEZ_FAIRE_PERIOD_SECS 60
1506
1507static int
1508compareConnections( const void * va, const void * vb )
1509{
1510    const struct tr_connection * a = va;
1511    const struct tr_connection * b = vb;
1512    if( a->throughput < b->throughput ) return -1;
1513    if( a->throughput > b->throughput ) return 1;
1514    return 0;
1515}
1516
1517static struct tr_connection *
1518getWeakConnections( Torrent * t, int * setmeSize )
1519{
1520    int i, insize, outsize;
1521    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
1522    struct tr_connection * ret = tr_new( struct tr_connection, insize );
1523    const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1524    const time_t now = time( NULL );
1525
1526    assert( torrentIsLocked( t ) );
1527
1528    for( i=outsize=0; i<insize; ++i )
1529    {
1530        tr_peer * peer = peers[i];
1531        int isWeak;
1532        const int peerIsSeed = peer->progress >= 1.0;
1533        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
1534        const double throughput = (2*tr_peerIoGetRateToPeer( peer->io ))
1535                                + tr_peerIoGetRateToClient( peer->io );
1536
1537        assert( atom != NULL );
1538
1539        /* if we're both seeds, give a little bit of time for
1540         * a mutual pex -- peer-msgs initiates a pex exchange
1541         * on startup -- and then disconnect */
1542        if( peerIsSeed && clientIsSeed && (now-atom->time >= 30) )
1543            isWeak = TRUE;
1544        else if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
1545            isWeak = FALSE;
1546        else if( throughput >= 5 )
1547            isWeak = FALSE;
1548        else
1549            isWeak = TRUE;
1550
1551        if( isWeak )
1552        {
1553            ret[outsize].peer = peer;
1554            ret[outsize].throughput = throughput;
1555            ++outsize;
1556        }
1557    }
1558
1559    qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
1560    *setmeSize = outsize;
1561    return ret;
1562}
1563
1564static int
1565compareAtomByTime( const void * va, const void * vb )
1566{
1567    const struct peer_atom * a = * (const struct peer_atom**) va;
1568    const struct peer_atom * b = * (const struct peer_atom**) vb;
1569    if( a->time < b->time ) return -1;
1570    if( a->time > b->time ) return 1;
1571    return 0;
1572}
1573
1574static struct peer_atom **
1575getPeerCandidates( Torrent * t, int * setmeSize )
1576{
1577    int i, insize, outsize;
1578    struct peer_atom ** atoms;
1579    struct peer_atom ** ret;
1580    const time_t now = time( NULL );
1581    const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1582
1583    assert( torrentIsLocked( t ) );
1584
1585    atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
1586    ret = tr_new( struct peer_atom*, insize );
1587    for( i=outsize=0; i<insize; ++i )
1588    {
1589        struct peer_atom * atom = atoms[i];
1590
1591        /* we don't need two connections to the same peer... */
1592        if( peerIsInUse( t, &atom->addr ) ) {
1593            fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1594            continue;
1595        }
1596
1597        /* no need to connect if we're both seeds... */
1598        if( seed && ( atom->flags & 2 ) ) {
1599            fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1600            continue;
1601        }
1602
1603        /* if we used this peer recently, give someone else a turn */
1604        if( ( now - atom->time ) <  LAISSEZ_FAIRE_PERIOD_SECS ) {
1605            fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
1606            continue;
1607        }
1608
1609        ret[outsize++] = atom;
1610    }
1611
1612    qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
1613    *setmeSize = outsize;
1614    return ret;
1615}
1616
1617static int
1618reconnectPulse( void * vtorrent )
1619{
1620    Torrent * t = vtorrent;
1621    struct peer_atom ** candidates;
1622    struct tr_connection * connections;
1623    int i, nCandidates, nConnections, nCull, nAdd;
1624    int peerCount;
1625
1626    torrentLock( t );
1627
1628    connections = getWeakConnections( t, &nConnections );
1629    candidates = getPeerCandidates( t, &nCandidates );
1630
1631    /* figure out how many peers to disconnect */
1632    nCull = nConnections-4; 
1633
1634fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
1635
1636for( i=0; i<nConnections; ++i )
1637fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
1638
1639    /* disconnect some peers */
1640    for( i=0; i<nCull && i<nConnections; ++i ) {
1641        const double throughput = connections[i].throughput;
1642        tr_peer * peer = connections[i].peer;
1643        fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
1644        removePeer( t, peer );
1645    }
1646
1647    /* add some new ones */
1648    peerCount = tr_ptrArraySize( t->peers );
1649    nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
1650    for( i=0; i<nAdd && i<nCandidates; ++i ) {
1651        struct peer_atom * atom = candidates[i];
1652        tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
1653fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
1654        initiateHandshake( t->manager, io );
1655        atom->time = time( NULL );
1656    }
1657
1658    /* cleanup */
1659    tr_free( connections );
1660    tr_free( candidates );
1661    torrentUnlock( t );
1662    return TRUE;
1663}
Note: See TracBrowser for help on using the repository browser.